diff --git a/.changelog/8603.txt b/.changelog/8603.txt new file mode 100644 index 0000000000..ffe9a9401f --- /dev/null +++ b/.changelog/8603.txt @@ -0,0 +1,3 @@ +```release-note:feature +telemetry: track node and service counts and emit them as metrics +``` diff --git a/agent/consul/config.go b/agent/consul/config.go index a48effe441..4316475651 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -443,6 +443,10 @@ type Config struct { // dead servers. AutopilotInterval time.Duration + // MetricsReportingInterval is the frequency with which the server will + // report usage metrics to the configured go-metrics Sinks. + MetricsReportingInterval time.Duration + // ConnectEnabled is whether to enable Connect features such as the CA. ConnectEnabled bool @@ -589,11 +593,16 @@ func DefaultConfig() *Config { }, }, - ServerHealthInterval: 2 * time.Second, - AutopilotInterval: 10 * time.Second, - DefaultQueryTime: 300 * time.Second, - MaxQueryTime: 600 * time.Second, - EnterpriseConfig: DefaultEnterpriseConfig(), + // Stay under the 10 second aggregation interval of + // go-metrics. This ensures we always report the + // usage metrics in each cycle. + MetricsReportingInterval: 9 * time.Second, + ServerHealthInterval: 2 * time.Second, + AutopilotInterval: 10 * time.Second, + DefaultQueryTime: 300 * time.Second, + MaxQueryTime: 600 * time.Second, + + EnterpriseConfig: DefaultEnterpriseConfig(), } // Increase our reap interval to 3 days instead of 24h. diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index e845c41c91..f798a0efaa 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -654,6 +654,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { require.NoError(t, err) require.Equal(t, fedState2, fedStateLoaded2) + // Verify usage data is correctly updated + idx, nodeCount, err := fsm2.state.NodeCount() + require.NoError(t, err) + require.Equal(t, len(nodes), nodeCount) + require.NotZero(t, idx) + // Snapshot snap, err = fsm2.Snapshot() require.NoError(t, err) diff --git a/agent/consul/server.go b/agent/consul/server.go index 04d3b61bd3..c1c1a6d76e 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/consul/usagemetrics" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" @@ -589,6 +590,19 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) { return nil, err } + reporter, err := usagemetrics.NewUsageMetricsReporter( + new(usagemetrics.Config). + WithStateProvider(s.fsm). + WithLogger(s.logger). + WithDatacenter(s.config.Datacenter). + WithReportingInterval(s.config.MetricsReportingInterval), + ) + if err != nil { + s.Shutdown() + return nil, fmt.Errorf("Failed to start usage metrics reporter: %v", err) + } + go reporter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) + // Initialize Autopilot. This must happen before starting leadership monitoring // as establishing leadership could attempt to use autopilot and cause a panic. s.initAutopilot(config) diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index be4f4348e2..895da9e069 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -15,6 +15,13 @@ type ReadTxn interface { Abort() } +// WriteTxn is implemented by memdb.Txn to perform write operations. +type WriteTxn interface { + ReadTxn + Insert(table string, obj interface{}) error + Commit() error +} + // Changes wraps a memdb.Changes to include the index at which these changes // were made. type Changes struct { @@ -24,8 +31,9 @@ type Changes struct { } // changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on -// all write transactions. When the transaction is committed the changes are -// sent to the eventPublisher which will create and emit change events. +// all write transactions. When the transaction is committed the changes are: +// 1. Used to update our internal usage tracking +// 2. Sent to the eventPublisher which will create and emit change events type changeTrackerDB struct { db *memdb.MemDB publisher eventPublisher @@ -89,17 +97,21 @@ func (c *changeTrackerDB) publish(changes Changes) error { return nil } -// WriteTxnRestore returns a wrapped RW transaction that does NOT have change -// tracking enabled. This should only be used in Restore where we need to -// replace the entire contents of the Store without a need to track the changes. -// WriteTxnRestore uses a zero index since the whole restore doesn't really occur -// at one index - the effect is to write many values that were previously -// written across many indexes. +// WriteTxnRestore returns a wrapped RW transaction that should only be used in +// Restore where we need to replace the entire contents of the Store. +// WriteTxnRestore uses a zero index since the whole restore doesn't really +// occur at one index - the effect is to write many values that were previously +// written across many indexes. WriteTxnRestore also does not publish any +// change events to subscribers. func (c *changeTrackerDB) WriteTxnRestore() *txn { - return &txn{ + t := &txn{ Txn: c.db.Txn(true), Index: 0, } + + // We enable change tracking so that usage data is correctly populated. + t.Txn.TrackChanges() + return t } // txn wraps a memdb.Txn to capture changes and send them to the EventPublisher. @@ -125,14 +137,21 @@ type txn struct { // by the caller. A non-nil error indicates that a commit failed and was not // applied. func (tx *txn) Commit() error { + changes := Changes{ + Index: tx.Index, + Changes: tx.Txn.Changes(), + } + + if len(changes.Changes) > 0 { + if err := updateUsage(tx, changes); err != nil { + return err + } + } + // publish may be nil if this is a read-only or WriteTxnRestore transaction. // In those cases changes should also be empty, and there will be nothing // to publish. if tx.publish != nil { - changes := Changes{ - Index: tx.Index, - Changes: tx.Txn.Changes(), - } if err := tx.publish(changes); err != nil { return err } diff --git a/agent/consul/state/operations_oss.go b/agent/consul/state/operations_oss.go index 4c382694b9..30deb70683 100644 --- a/agent/consul/state/operations_oss.go +++ b/agent/consul/state/operations_oss.go @@ -7,30 +7,30 @@ import ( "github.com/hashicorp/go-memdb" ) -func firstWithTxn(tx *txn, +func firstWithTxn(tx ReadTxn, table, index, idxVal string, entMeta *structs.EnterpriseMeta) (interface{}, error) { return tx.First(table, index, idxVal) } -func firstWatchWithTxn(tx *txn, +func firstWatchWithTxn(tx ReadTxn, table, index, idxVal string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) { return tx.FirstWatch(table, index, idxVal) } -func firstWatchCompoundWithTxn(tx *txn, +func firstWatchCompoundWithTxn(tx ReadTxn, table, index string, _ *structs.EnterpriseMeta, idxVals ...interface{}) (<-chan struct{}, interface{}, error) { return tx.FirstWatch(table, index, idxVals...) } -func getWithTxn(tx *txn, +func getWithTxn(tx ReadTxn, table, index, idxVal string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { return tx.Get(table, index, idxVal) } -func getCompoundWithTxn(tx *txn, table, index string, +func getCompoundWithTxn(tx ReadTxn, table, index string, _ *structs.EnterpriseMeta, idxVals ...interface{}) (memdb.ResultIterator, error) { return tx.Get(table, index, idxVals...) diff --git a/agent/consul/state/usage.go b/agent/consul/state/usage.go new file mode 100644 index 0000000000..6e43f3729f --- /dev/null +++ b/agent/consul/state/usage.go @@ -0,0 +1,258 @@ +package state + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/structs" + memdb "github.com/hashicorp/go-memdb" +) + +const ( + serviceNamesUsageTable = "service-names" +) + +// usageTableSchema returns a new table schema used for tracking various indexes +// for the Raft log. +func usageTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: "usage", + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "ID", + Lowercase: true, + }, + }, + }, + } +} + +func init() { + registerSchema(usageTableSchema) +} + +// UsageEntry represents a count of some arbitrary identifier within the +// state store, along with the last seen index. +type UsageEntry struct { + ID string + Index uint64 + Count int +} + +// ServiceUsage contains all of the usage data related to services +type ServiceUsage struct { + Services int + ServiceInstances int + EnterpriseServiceUsage +} + +type uniqueServiceState int + +const ( + NoChange uniqueServiceState = 0 + Deleted uniqueServiceState = 1 + Created uniqueServiceState = 2 +) + +// updateUsage takes a set of memdb changes and computes a delta for specific +// usage metrics that we track. +func updateUsage(tx WriteTxn, changes Changes) error { + usageDeltas := make(map[string]int) + for _, change := range changes.Changes { + var delta int + if change.Created() { + delta = 1 + } else if change.Deleted() { + delta = -1 + } + + switch change.Table { + case "nodes": + usageDeltas[change.Table] += delta + case "services": + svc := changeObject(change).(*structs.ServiceNode) + usageDeltas[change.Table] += delta + serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.ServiceName, &svc.EnterpriseMeta) + if err != nil { + return err + } + + var serviceState uniqueServiceState + if serviceIter.Next() == nil { + // If no services exist, we know we deleted the last service + // instance. + serviceState = Deleted + usageDeltas[serviceNamesUsageTable] -= 1 + } else if serviceIter.Next() == nil { + // If a second call to Next() returns nil, we know only a single + // instance exists. If, in addition, a new service name has been + // registered, either via creating a new service instance or via + // renaming an existing service, than we update our service count. + // + // We only care about two cases here: + // 1. A new service instance has been created with a unique name + // 2. An existing service instance has been updated with a new unique name + // + // These are the only ways a new unique service can be created. The + // other valid cases here: an update that does not change the service + // name, and a deletion, both do not impact the count of unique service + // names in the system. + + if change.Created() { + // Given a single existing service instance of the service: If a + // service has just been created, then we know this is a new unique + // service. + serviceState = Created + usageDeltas[serviceNamesUsageTable] += 1 + } else if serviceNameChanged(change) { + // Given a single existing service instance of the service: If a + // service has been updated with a new service name, then we know + // this is a new unique service. + serviceState = Created + usageDeltas[serviceNamesUsageTable] += 1 + + // Check whether the previous name was deleted in this rename, this + // is a special case of renaming a service which does not result in + // changing the count of unique service names. + before := change.Before.(*structs.ServiceNode) + beforeSvc, err := firstWithTxn(tx, servicesTableName, "service", before.ServiceName, &before.EnterpriseMeta) + if err != nil { + return err + } + if beforeSvc == nil { + usageDeltas[serviceNamesUsageTable] -= 1 + // set serviceState to NoChange since we have both gained and lost a + // service, cancelling each other out + serviceState = NoChange + } + } + } + addEnterpriseServiceUsage(usageDeltas, change, serviceState) + } + } + + idx := changes.Index + // This will happen when restoring from a snapshot, just take the max index + // of the tables we are tracking. + if idx == 0 { + idx = maxIndexTxn(tx, "nodes", servicesTableName) + } + + return writeUsageDeltas(tx, idx, usageDeltas) +} + +// serviceNameChanged returns a boolean that indicates whether the +// provided change resulted in an update to the service's service name. +func serviceNameChanged(change memdb.Change) bool { + if change.Updated() { + before := change.Before.(*structs.ServiceNode) + after := change.After.(*structs.ServiceNode) + return before.ServiceName != after.ServiceName + } + + return false +} + +// writeUsageDeltas will take in a map of IDs to deltas and update each +// entry accordingly, checking for integer underflow. The index that is +// passed in will be recorded on the entry as well. +func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error { + for id, delta := range usageDeltas { + u, err := tx.First("usage", "id", id) + if err != nil { + return fmt.Errorf("failed to retrieve existing usage entry: %s", err) + } + + if u == nil { + if delta < 0 { + return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id) + } + err := tx.Insert("usage", &UsageEntry{ + ID: id, + Count: delta, + Index: idx, + }) + if err != nil { + return fmt.Errorf("failed to update usage entry: %s", err) + } + } else if cur, ok := u.(*UsageEntry); ok { + if cur.Count+delta < 0 { + return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id) + } + err := tx.Insert("usage", &UsageEntry{ + ID: id, + Count: cur.Count + delta, + Index: idx, + }) + if err != nil { + return fmt.Errorf("failed to update usage entry: %s", err) + } + } + } + return nil +} + +// NodeCount returns the latest seen Raft index, a count of the number of nodes +// registered, and any errors. +func (s *Store) NodeCount() (uint64, int, error) { + tx := s.db.ReadTxn() + defer tx.Abort() + + nodeUsage, err := firstUsageEntry(tx, "nodes") + if err != nil { + return 0, 0, fmt.Errorf("failed nodes lookup: %s", err) + } + return nodeUsage.Index, nodeUsage.Count, nil +} + +// ServiceUsage returns the latest seen Raft index, a compiled set of service +// usage data, and any errors. +func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) { + tx := s.db.ReadTxn() + defer tx.Abort() + + serviceInstances, err := firstUsageEntry(tx, servicesTableName) + if err != nil { + return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) + } + + services, err := firstUsageEntry(tx, serviceNamesUsageTable) + if err != nil { + return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) + } + + usage := ServiceUsage{ + ServiceInstances: serviceInstances.Count, + Services: services.Count, + } + results, err := compileEnterpriseUsage(tx, usage) + if err != nil { + return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) + } + + return serviceInstances.Index, results, nil +} + +func firstUsageEntry(tx ReadTxn, id string) (*UsageEntry, error) { + usage, err := tx.First("usage", "id", id) + if err != nil { + return nil, err + } + + // If no elements have been inserted, the usage entry will not exist. We + // return a valid value so that can be certain the return value is not nil + // when no error has occurred. + if usage == nil { + return &UsageEntry{ID: id, Count: 0}, nil + } + + realUsage, ok := usage.(*UsageEntry) + if !ok { + return nil, fmt.Errorf("failed usage lookup: type %T is not *UsageEntry", usage) + } + + return realUsage, nil +} diff --git a/agent/consul/state/usage_oss.go b/agent/consul/state/usage_oss.go new file mode 100644 index 0000000000..f35576abf5 --- /dev/null +++ b/agent/consul/state/usage_oss.go @@ -0,0 +1,15 @@ +// +build !consulent + +package state + +import ( + memdb "github.com/hashicorp/go-memdb" +) + +type EnterpriseServiceUsage struct{} + +func addEnterpriseServiceUsage(map[string]int, memdb.Change, uniqueServiceState) {} + +func compileEnterpriseUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) { + return usage, nil +} diff --git a/agent/consul/state/usage_oss_test.go b/agent/consul/state/usage_oss_test.go new file mode 100644 index 0000000000..b441c71635 --- /dev/null +++ b/agent/consul/state/usage_oss_test.go @@ -0,0 +1,25 @@ +// +build !consulent + +package state + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestStateStore_Usage_ServiceUsage(t *testing.T) { + s := testStateStore(t) + + testRegisterNode(t, s, 0, "node1") + testRegisterNode(t, s, 1, "node2") + testRegisterService(t, s, 8, "node1", "service1") + testRegisterService(t, s, 9, "node2", "service1") + testRegisterService(t, s, 10, "node2", "service2") + + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(10)) + require.Equal(t, 2, usage.Services) + require.Equal(t, 3, usage.ServiceInstances) +} diff --git a/agent/consul/state/usage_test.go b/agent/consul/state/usage_test.go new file mode 100644 index 0000000000..f608d7d75c --- /dev/null +++ b/agent/consul/state/usage_test.go @@ -0,0 +1,194 @@ +package state + +import ( + "testing" + + "github.com/hashicorp/consul/agent/structs" + memdb "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/require" +) + +func TestStateStore_Usage_NodeCount(t *testing.T) { + s := testStateStore(t) + + // No nodes have been registered, and thus no usage entry exists + idx, count, err := s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(0)) + require.Equal(t, count, 0) + + testRegisterNode(t, s, 0, "node1") + testRegisterNode(t, s, 1, "node2") + + idx, count, err = s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(1)) + require.Equal(t, count, 2) +} + +func TestStateStore_Usage_NodeCount_Delete(t *testing.T) { + s := testStateStore(t) + + testRegisterNode(t, s, 0, "node1") + testRegisterNode(t, s, 1, "node2") + + idx, count, err := s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(1)) + require.Equal(t, count, 2) + + require.NoError(t, s.DeleteNode(2, "node2")) + idx, count, err = s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(2)) + require.Equal(t, count, 1) +} + +func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) { + s := testStateStore(t) + + // No services have been registered, and thus no usage entry exists + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(0)) + require.Equal(t, usage.Services, 0) + require.Equal(t, usage.ServiceInstances, 0) +} + +func TestStateStore_Usage_Restore(t *testing.T) { + s := testStateStore(t) + restore := s.Restore() + restore.Registration(9, &structs.RegisterRequest{ + Node: "test-node", + Service: &structs.NodeService{ + ID: "mysql", + Service: "mysql", + Port: 8080, + Address: "198.18.0.2", + }, + }) + require.NoError(t, restore.Commit()) + + idx, count, err := s.NodeCount() + require.NoError(t, err) + require.Equal(t, idx, uint64(9)) + require.Equal(t, count, 1) +} + +func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) { + s := testStateStore(t) + txn := s.db.WriteTxn(1) + + // A single delete change will cause a negative count + changes := Changes{ + Index: 1, + Changes: memdb.Changes{ + { + Table: "nodes", + Before: &structs.Node{}, + After: nil, + }, + }, + } + + err := updateUsage(txn, changes) + require.Error(t, err) + require.Contains(t, err.Error(), "negative count") + + // A insert a change to create a usage entry + changes = Changes{ + Index: 1, + Changes: memdb.Changes{ + { + Table: "nodes", + Before: nil, + After: &structs.Node{}, + }, + }, + } + + err = updateUsage(txn, changes) + require.NoError(t, err) + + // Two deletes will cause a negative count now + changes = Changes{ + Index: 1, + Changes: memdb.Changes{ + { + Table: "nodes", + Before: &structs.Node{}, + After: nil, + }, + { + Table: "nodes", + Before: &structs.Node{}, + After: nil, + }, + }, + } + + err = updateUsage(txn, changes) + require.Error(t, err) + require.Contains(t, err.Error(), "negative count") +} + +func TestStateStore_Usage_ServiceUsage_updatingServiceName(t *testing.T) { + s := testStateStore(t) + testRegisterNode(t, s, 1, "node1") + testRegisterService(t, s, 1, "node1", "service1") + + t.Run("rename service with a single instance", func(t *testing.T) { + svc := &structs.NodeService{ + ID: "service1", + Service: "after", + Address: "1.1.1.1", + Port: 1111, + } + require.NoError(t, s.EnsureService(2, "node1", svc)) + + // We renamed a service with a single instance, so we maintain 1 service. + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(2)) + require.Equal(t, usage.Services, 1) + require.Equal(t, usage.ServiceInstances, 1) + }) + + t.Run("rename service with a multiple instances", func(t *testing.T) { + svc2 := &structs.NodeService{ + ID: "service2", + Service: "before", + Address: "1.1.1.2", + Port: 1111, + } + require.NoError(t, s.EnsureService(3, "node1", svc2)) + + svc3 := &structs.NodeService{ + ID: "service3", + Service: "before", + Address: "1.1.1.3", + Port: 1111, + } + require.NoError(t, s.EnsureService(4, "node1", svc3)) + + idx, usage, err := s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(4)) + require.Equal(t, usage.Services, 2) + require.Equal(t, usage.ServiceInstances, 3) + + update := &structs.NodeService{ + ID: "service2", + Service: "another-name", + Address: "1.1.1.2", + Port: 1111, + } + require.NoError(t, s.EnsureService(5, "node1", update)) + + idx, usage, err = s.ServiceUsage() + require.NoError(t, err) + require.Equal(t, idx, uint64(5)) + require.Equal(t, usage.Services, 3) + require.Equal(t, usage.ServiceInstances, 3) + }) +} diff --git a/agent/consul/usagemetrics/usagemetrics.go b/agent/consul/usagemetrics/usagemetrics.go new file mode 100644 index 0000000000..18b36cfd69 --- /dev/null +++ b/agent/consul/usagemetrics/usagemetrics.go @@ -0,0 +1,135 @@ +package usagemetrics + +import ( + "context" + "errors" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/logging" + "github.com/hashicorp/go-hclog" +) + +// Config holds the settings for various parameters for the +// UsageMetricsReporter +type Config struct { + logger hclog.Logger + metricLabels []metrics.Label + stateProvider StateProvider + tickerInterval time.Duration +} + +// WithDatacenter adds the datacenter as a label to all metrics emitted by the +// UsageMetricsReporter +func (c *Config) WithDatacenter(dc string) *Config { + c.metricLabels = append(c.metricLabels, metrics.Label{Name: "datacenter", Value: dc}) + return c +} + +// WithLogger takes a logger and creates a new, named sub-logger to use when +// running +func (c *Config) WithLogger(logger hclog.Logger) *Config { + c.logger = logger.Named(logging.UsageMetrics) + return c +} + +// WithReportingInterval specifies the interval on which UsageMetricsReporter +// should emit metrics +func (c *Config) WithReportingInterval(dur time.Duration) *Config { + c.tickerInterval = dur + return c +} + +func (c *Config) WithStateProvider(sp StateProvider) *Config { + c.stateProvider = sp + return c +} + +// StateProvider defines an inteface for retrieving a state.Store handle. In +// non-test code, this is satisfied by the fsm.FSM struct. +type StateProvider interface { + State() *state.Store +} + +// UsageMetricsReporter provides functionality for emitting usage metrics into +// the metrics stream. This makes it essentially a translation layer +// between the state store and metrics stream. +type UsageMetricsReporter struct { + logger hclog.Logger + metricLabels []metrics.Label + stateProvider StateProvider + tickerInterval time.Duration +} + +func NewUsageMetricsReporter(cfg *Config) (*UsageMetricsReporter, error) { + if cfg.stateProvider == nil { + return nil, errors.New("must provide a StateProvider to usage reporter") + } + + if cfg.logger == nil { + cfg.logger = hclog.NewNullLogger() + } + + if cfg.tickerInterval == 0 { + // Metrics are aggregated every 10 seconds, so we default to that. + cfg.tickerInterval = 10 * time.Second + } + + u := &UsageMetricsReporter{ + logger: cfg.logger, + stateProvider: cfg.stateProvider, + metricLabels: cfg.metricLabels, + tickerInterval: cfg.tickerInterval, + } + + return u, nil +} + +// Run must be run in a goroutine, and can be stopped by closing or sending +// data to the passed in shutdownCh +func (u *UsageMetricsReporter) Run(ctx context.Context) { + ticker := time.NewTicker(u.tickerInterval) + for { + select { + case <-ctx.Done(): + u.logger.Debug("usage metrics reporter shutting down") + ticker.Stop() + return + case <-ticker.C: + u.runOnce() + } + } +} + +func (u *UsageMetricsReporter) runOnce() { + state := u.stateProvider.State() + _, nodes, err := state.NodeCount() + if err != nil { + u.logger.Warn("failed to retrieve nodes from state store", "error", err) + } + metrics.SetGaugeWithLabels( + []string{"consul", "state", "nodes"}, + float32(nodes), + u.metricLabels, + ) + + _, serviceUsage, err := state.ServiceUsage() + if err != nil { + u.logger.Warn("failed to retrieve services from state store", "error", err) + } + + metrics.SetGaugeWithLabels( + []string{"consul", "state", "services"}, + float32(serviceUsage.Services), + u.metricLabels, + ) + + metrics.SetGaugeWithLabels( + []string{"consul", "state", "service_instances"}, + float32(serviceUsage.ServiceInstances), + u.metricLabels, + ) + + u.emitEnterpriseUsage(serviceUsage) +} diff --git a/agent/consul/usagemetrics/usagemetrics_oss.go b/agent/consul/usagemetrics/usagemetrics_oss.go new file mode 100644 index 0000000000..37d71b83f8 --- /dev/null +++ b/agent/consul/usagemetrics/usagemetrics_oss.go @@ -0,0 +1,7 @@ +// +build !consulent + +package usagemetrics + +import "github.com/hashicorp/consul/agent/consul/state" + +func (u *UsageMetricsReporter) emitEnterpriseUsage(state.ServiceUsage) {} diff --git a/agent/consul/usagemetrics/usagemetrics_oss_test.go b/agent/consul/usagemetrics/usagemetrics_oss_test.go new file mode 100644 index 0000000000..3d5263c0b2 --- /dev/null +++ b/agent/consul/usagemetrics/usagemetrics_oss_test.go @@ -0,0 +1,9 @@ +// +build !consulent + +package usagemetrics + +import "github.com/hashicorp/consul/agent/consul/state" + +func newStateStore() (*state.Store, error) { + return state.NewStateStore(nil) +} diff --git a/agent/consul/usagemetrics/usagemetrics_test.go b/agent/consul/usagemetrics/usagemetrics_test.go new file mode 100644 index 0000000000..c293cbb1de --- /dev/null +++ b/agent/consul/usagemetrics/usagemetrics_test.go @@ -0,0 +1,128 @@ +package usagemetrics + +import ( + "testing" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +type mockStateProvider struct { + mock.Mock +} + +func (m *mockStateProvider) State() *state.Store { + retValues := m.Called() + return retValues.Get(0).(*state.Store) +} + +func TestUsageReporter_Run(t *testing.T) { + type testCase struct { + modfiyStateStore func(t *testing.T, s *state.Store) + expectedGauges map[string]metrics.GaugeValue + } + cases := map[string]testCase{ + "empty-state": { + expectedGauges: map[string]metrics.GaugeValue{ + "consul.usage.test.consul.state.nodes;datacenter=dc1": { + Name: "consul.usage.test.consul.state.nodes", + Value: 0, + Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}}, + }, + "consul.usage.test.consul.state.services;datacenter=dc1": { + Name: "consul.usage.test.consul.state.services", + Value: 0, + Labels: []metrics.Label{ + {Name: "datacenter", Value: "dc1"}, + }, + }, + "consul.usage.test.consul.state.service_instances;datacenter=dc1": { + Name: "consul.usage.test.consul.state.service_instances", + Value: 0, + Labels: []metrics.Label{ + {Name: "datacenter", Value: "dc1"}, + }, + }, + }, + }, + "nodes-and-services": { + modfiyStateStore: func(t *testing.T, s *state.Store) { + require.Nil(t, s.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})) + require.Nil(t, s.EnsureNode(2, &structs.Node{Node: "bar", Address: "127.0.0.2"})) + require.Nil(t, s.EnsureNode(3, &structs.Node{Node: "baz", Address: "127.0.0.2"})) + + // Typical services and some consul services spread across two nodes + require.Nil(t, s.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000})) + require.Nil(t, s.EnsureService(5, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000})) + require.Nil(t, s.EnsureService(6, "foo", &structs.NodeService{ID: "consul", Service: "consul", Tags: nil})) + require.Nil(t, s.EnsureService(7, "bar", &structs.NodeService{ID: "consul", Service: "consul", Tags: nil})) + }, + expectedGauges: map[string]metrics.GaugeValue{ + "consul.usage.test.consul.state.nodes;datacenter=dc1": { + Name: "consul.usage.test.consul.state.nodes", + Value: 3, + Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}}, + }, + "consul.usage.test.consul.state.services;datacenter=dc1": { + Name: "consul.usage.test.consul.state.services", + Value: 3, + Labels: []metrics.Label{ + {Name: "datacenter", Value: "dc1"}, + }, + }, + "consul.usage.test.consul.state.service_instances;datacenter=dc1": { + Name: "consul.usage.test.consul.state.service_instances", + Value: 4, + Labels: []metrics.Label{ + {Name: "datacenter", Value: "dc1"}, + }, + }, + }, + }, + } + + for name, tcase := range cases { + t.Run(name, func(t *testing.T) { + // Only have a single interval for the test + sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute) + cfg := metrics.DefaultConfig("consul.usage.test") + cfg.EnableHostname = false + metrics.NewGlobal(cfg, sink) + + mockStateProvider := &mockStateProvider{} + s, err := newStateStore() + require.NoError(t, err) + if tcase.modfiyStateStore != nil { + tcase.modfiyStateStore(t, s) + } + mockStateProvider.On("State").Return(s) + + reporter, err := NewUsageMetricsReporter( + new(Config). + WithStateProvider(mockStateProvider). + WithLogger(testutil.Logger(t)). + WithDatacenter("dc1"), + ) + require.NoError(t, err) + + reporter.runOnce() + + intervals := sink.Data() + require.Len(t, intervals, 1) + intv := intervals[0] + + // Range over the expected values instead of just doing an Equal + // comparison on the maps because of different metrics emitted between + // OSS and Ent. The enterprise tests have a full equality comparison on + // the maps. + for key, expected := range tcase.expectedGauges { + require.Equal(t, expected, intv.Gauges[key]) + } + }) + } +} diff --git a/logging/names.go b/logging/names.go index 6ade11bf69..02c0fbf69f 100644 --- a/logging/names.go +++ b/logging/names.go @@ -51,6 +51,7 @@ const ( TerminatingGateway string = "terminating_gateway" TLSUtil string = "tlsutil" Transaction string = "txn" + UsageMetrics string = "usage_metrics" WAN string = "wan" Watch string = "watch" ) diff --git a/website/pages/docs/agent/telemetry.mdx b/website/pages/docs/agent/telemetry.mdx index 449997fcd7..ea40e2ee64 100644 --- a/website/pages/docs/agent/telemetry.mdx +++ b/website/pages/docs/agent/telemetry.mdx @@ -171,6 +171,9 @@ This is a full list of metrics emitted by Consul. | `consul.runtime.num_goroutines` | This tracks the number of running goroutines and is a general load pressure indicator. This may burst from time to time but should return to a steady state value. | number of goroutines | gauge | | `consul.runtime.alloc_bytes` | This measures the number of bytes allocated by the Consul process. This may burst from time to time but should return to a steady state value. | bytes | gauge | | `consul.runtime.heap_objects` | This measures the number of objects allocated on the heap and is a general memory pressure indicator. This may burst from time to time but should return to a steady state value. | number of objects | gauge | +| `consul.state.nodes` | This meansures the current number of nodes registered with Consul. It is only emitted by Consul servers. | number of objects | gauge | +| `consul.state.services` | This meansures the current number of unique services registered with Consul, based on service name. It is only emitted by Consul servers. | number of objects | gauge | +| `consul.state.service_instances` | This meansures the current number of unique service instances registered with Consul. It is only emitted by Consul servers. | number of objects | gauge | | `consul.acl.cache_hit` | The number of ACL cache hits. | hits | counter | | `consul.acl.cache_miss` | The number of ACL cache misses. | misses | counter | | `consul.acl.replication_hit` | The number of ACL replication cache hits (when not running in the ACL datacenter). | hits | counter |