mirror of https://github.com/hashicorp/consul
feat: backport service instances table to 1.14.x (#16819)
parent
ecc6a7786d
commit
1af132288e
|
@ -52,14 +52,6 @@ type UsageEntry struct {
|
|||
Count int
|
||||
}
|
||||
|
||||
// ServiceUsage contains all of the usage data related to services
|
||||
type ServiceUsage struct {
|
||||
Services int
|
||||
ServiceInstances int
|
||||
ConnectServiceInstances map[string]int
|
||||
EnterpriseServiceUsage
|
||||
}
|
||||
|
||||
// NodeUsage contains all of the usage data related to nodes
|
||||
type NodeUsage struct {
|
||||
Nodes int
|
||||
|
@ -128,6 +120,8 @@ func updateUsage(tx WriteTxn, changes Changes) error {
|
|||
addEnterpriseServiceInstanceUsage(usageDeltas, change)
|
||||
|
||||
connectDeltas(change, usageDeltas, delta)
|
||||
billableServiceInstancesDeltas(change, usageDeltas, delta)
|
||||
|
||||
// Construct a mapping of all of the various service names that were
|
||||
// changed, in order to compare it with the finished memdb state.
|
||||
// Make sure to account for the fact that services can change their names.
|
||||
|
@ -271,6 +265,53 @@ func connectDeltas(change memdb.Change, usageDeltas map[string]int, delta int) {
|
|||
}
|
||||
}
|
||||
|
||||
// billableServiceInstancesDeltas calculates deltas for the billable services. Billable services
|
||||
// are of "typical" service kind (i.e. non-connect or connect-native), excluding the "consul" service.
|
||||
func billableServiceInstancesDeltas(change memdb.Change, usageDeltas map[string]int, delta int) {
|
||||
// Billable service instances = # of typical service instances (i.e. non-connect) + connect-native service instances.
|
||||
// Specifically, it should exclude "consul" service instances from the count.
|
||||
//
|
||||
// If the service has been updated, then we check
|
||||
// 1. If the service name changed to or from "consul" and update deltas such that we exclude consul server service instances.
|
||||
// This case is a bit contrived because we don't expect consul service to change once it's registered (beyond changing its instance count).
|
||||
// a) If changed to "consul" -> decrement deltas by one
|
||||
// b) If changed from "consul" and it's not a "connect" service -> increase deltas by one
|
||||
// 2. If the service kind changed to or from "typical", we need to we need to update deltas so that we only account
|
||||
// for non-connect or connect-native instances.
|
||||
if change.Updated() {
|
||||
// When there's an update, the delta arg passed to this function is 0, and so we need to explicitly increment
|
||||
// or decrement by 1 depending on the situation.
|
||||
before := change.Before.(*structs.ServiceNode)
|
||||
after := change.After.(*structs.ServiceNode)
|
||||
// Service name changed away from "consul" means we now need to account for this service instances unless it's a "connect" service.
|
||||
if before.ServiceName == structs.ConsulServiceName && after.ServiceName != structs.ConsulServiceName {
|
||||
if after.ServiceKind == structs.ServiceKindTypical {
|
||||
usageDeltas[billableServiceInstancesTableName()] += 1
|
||||
addEnterpriseBillableServiceInstanceUsage(usageDeltas, after, 1)
|
||||
}
|
||||
}
|
||||
if before.ServiceName != structs.ConsulServiceName && after.ServiceName == structs.ConsulServiceName {
|
||||
usageDeltas[billableServiceInstancesTableName()] -= 1
|
||||
addEnterpriseBillableServiceInstanceUsage(usageDeltas, before, -1)
|
||||
}
|
||||
|
||||
if before.ServiceKind != structs.ServiceKindTypical && after.ServiceKind == structs.ServiceKindTypical {
|
||||
usageDeltas[billableServiceInstancesTableName()] += 1
|
||||
addEnterpriseBillableServiceInstanceUsage(usageDeltas, after, 1)
|
||||
} else if before.ServiceKind == structs.ServiceKindTypical && after.ServiceKind != structs.ServiceKindTypical {
|
||||
usageDeltas[billableServiceInstancesTableName()] -= 1
|
||||
addEnterpriseBillableServiceInstanceUsage(usageDeltas, before, -1)
|
||||
}
|
||||
} else {
|
||||
svc := changeObject(change).(*structs.ServiceNode)
|
||||
// If it's not an update, only update delta if it's a typical service and not the "consul" service.
|
||||
if svc.ServiceKind == structs.ServiceKindTypical && svc.ServiceName != structs.ConsulServiceName {
|
||||
usageDeltas[billableServiceInstancesTableName()] += delta
|
||||
addEnterpriseBillableServiceInstanceUsage(usageDeltas, svc, delta)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// writeUsageDeltas will take in a map of IDs to deltas and update each
|
||||
// entry accordingly, checking for integer underflow. The index that is
|
||||
// passed in will be recorded on the entry as well.
|
||||
|
@ -289,7 +330,7 @@ func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error
|
|||
// large numbers.
|
||||
delta = 0
|
||||
}
|
||||
err := tx.Insert(tableUsage, &UsageEntry{
|
||||
err = tx.Insert(tableUsage, &UsageEntry{
|
||||
ID: id,
|
||||
Count: delta,
|
||||
Index: idx,
|
||||
|
@ -365,37 +406,43 @@ func (s *Store) PeeringUsage() (uint64, PeeringUsage, error) {
|
|||
|
||||
// ServiceUsage returns the latest seen Raft index, a compiled set of service
|
||||
// usage data, and any errors.
|
||||
func (s *Store) ServiceUsage(ws memdb.WatchSet) (uint64, ServiceUsage, error) {
|
||||
func (s *Store) ServiceUsage(ws memdb.WatchSet) (uint64, structs.ServiceUsage, error) {
|
||||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
|
||||
serviceInstances, err := firstUsageEntry(ws, tx, tableServices)
|
||||
if err != nil {
|
||||
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
||||
return 0, structs.ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
||||
}
|
||||
|
||||
services, err := firstUsageEntry(ws, tx, serviceNamesUsageTable)
|
||||
if err != nil {
|
||||
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
||||
return 0, structs.ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
||||
}
|
||||
|
||||
serviceKindInstances := make(map[string]int)
|
||||
for _, kind := range allConnectKind {
|
||||
usage, err := firstUsageEntry(ws, tx, connectUsageTableName(kind))
|
||||
if err != nil {
|
||||
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
||||
return 0, structs.ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
||||
}
|
||||
serviceKindInstances[kind] = usage.Count
|
||||
}
|
||||
|
||||
usage := ServiceUsage{
|
||||
ServiceInstances: serviceInstances.Count,
|
||||
Services: services.Count,
|
||||
ConnectServiceInstances: serviceKindInstances,
|
||||
billableServiceInstances, err := firstUsageEntry(ws, tx, billableServiceInstancesTableName())
|
||||
if err != nil {
|
||||
return 0, structs.ServiceUsage{}, fmt.Errorf("failed billable services lookup: %s", err)
|
||||
}
|
||||
|
||||
usage := structs.ServiceUsage{
|
||||
ServiceInstances: serviceInstances.Count,
|
||||
Services: services.Count,
|
||||
ConnectServiceInstances: serviceKindInstances,
|
||||
BillableServiceInstances: billableServiceInstances.Count,
|
||||
}
|
||||
results, err := compileEnterpriseServiceUsage(ws, tx, usage)
|
||||
if err != nil {
|
||||
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
||||
return 0, structs.ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
||||
}
|
||||
|
||||
return serviceInstances.Index, results, nil
|
||||
|
@ -469,3 +516,7 @@ func firstUsageEntry(ws memdb.WatchSet, tx ReadTxn, id string) (*UsageEntry, err
|
|||
|
||||
return realUsage, nil
|
||||
}
|
||||
|
||||
func billableServiceInstancesTableName() string {
|
||||
return fmt.Sprintf("billable-%s", tableServices)
|
||||
}
|
||||
|
|
|
@ -25,11 +25,13 @@ func addEnterpriseServiceUsage(map[string]int, map[structs.ServiceName]uniqueSer
|
|||
|
||||
func addEnterpriseConnectServiceInstanceUsage(map[string]int, *structs.ServiceNode, int) {}
|
||||
|
||||
func addEnterpriseBillableServiceInstanceUsage(map[string]int, *structs.ServiceNode, int) {}
|
||||
|
||||
func addEnterpriseKVUsage(map[string]int, memdb.Change) {}
|
||||
|
||||
func addEnterpriseConfigEntryUsage(map[string]int, memdb.Change) {}
|
||||
|
||||
func compileEnterpriseServiceUsage(ws memdb.WatchSet, tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) {
|
||||
func compileEnterpriseServiceUsage(ws memdb.WatchSet, tx ReadTxn, usage structs.ServiceUsage) (structs.ServiceUsage, error) {
|
||||
return usage, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -160,6 +160,7 @@ func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) {
|
|||
for k := range usage.ConnectServiceInstances {
|
||||
require.Equal(t, 0, usage.ConnectServiceInstances[k])
|
||||
}
|
||||
require.Equal(t, 0, usage.BillableServiceInstances)
|
||||
}
|
||||
|
||||
func TestStateStore_Usage_ServiceUsage(t *testing.T) {
|
||||
|
@ -184,6 +185,7 @@ func TestStateStore_Usage_ServiceUsage(t *testing.T) {
|
|||
require.Equal(t, 8, usage.ServiceInstances)
|
||||
require.Equal(t, 2, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
|
||||
require.Equal(t, 3, usage.ConnectServiceInstances[connectNativeInstancesTable])
|
||||
require.Equal(t, 6, usage.BillableServiceInstances)
|
||||
|
||||
testRegisterSidecarProxy(t, s, 16, "node2", "service2")
|
||||
|
||||
|
@ -225,6 +227,7 @@ func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
|
|||
require.Equal(t, 4, usage.ServiceInstances)
|
||||
require.Equal(t, 1, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
|
||||
require.Equal(t, 1, usage.ConnectServiceInstances[connectNativeInstancesTable])
|
||||
require.Equal(t, 3, usage.BillableServiceInstances)
|
||||
|
||||
require.NoError(t, s.DeleteNode(4, "node1", nil, ""))
|
||||
|
||||
|
@ -236,6 +239,7 @@ func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
|
|||
for k := range usage.ConnectServiceInstances {
|
||||
require.Equal(t, 0, usage.ConnectServiceInstances[k])
|
||||
}
|
||||
require.Equal(t, 0, usage.BillableServiceInstances)
|
||||
}
|
||||
|
||||
// Test that services from remote peers aren't counted in writes or deletes.
|
||||
|
@ -263,6 +267,7 @@ func TestStateStore_Usage_ServiceUsagePeering(t *testing.T) {
|
|||
require.Equal(t, 3, usage.ServiceInstances)
|
||||
require.Equal(t, 1, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
|
||||
require.Equal(t, 1, usage.ConnectServiceInstances[connectNativeInstancesTable])
|
||||
require.Equal(t, 2, usage.BillableServiceInstances)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "deletes", func(t *testing.T) {
|
||||
|
@ -275,6 +280,7 @@ func TestStateStore_Usage_ServiceUsagePeering(t *testing.T) {
|
|||
require.Equal(t, 0, usage.ServiceInstances)
|
||||
require.Equal(t, 0, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
|
||||
require.Equal(t, 0, usage.ConnectServiceInstances[connectNativeInstancesTable])
|
||||
require.Equal(t, 0, usage.BillableServiceInstances)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -311,6 +317,7 @@ func TestStateStore_Usage_Restore(t *testing.T) {
|
|||
require.Equal(t, idx, uint64(9))
|
||||
require.Equal(t, usage.Services, 1)
|
||||
require.Equal(t, usage.ServiceInstances, 2)
|
||||
require.Equal(t, usage.BillableServiceInstances, 2)
|
||||
}
|
||||
|
||||
func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) {
|
||||
|
@ -411,6 +418,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
|
|||
require.Equal(t, idx, uint64(2))
|
||||
require.Equal(t, usage.Services, 1)
|
||||
require.Equal(t, usage.ServiceInstances, 1)
|
||||
require.Equal(t, usage.BillableServiceInstances, 1)
|
||||
})
|
||||
|
||||
t.Run("update service to be connect native", func(t *testing.T) {
|
||||
|
@ -432,6 +440,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
|
|||
require.Equal(t, usage.Services, 1)
|
||||
require.Equal(t, usage.ServiceInstances, 1)
|
||||
require.Equal(t, 1, usage.ConnectServiceInstances[connectNativeInstancesTable])
|
||||
require.Equal(t, 1, usage.BillableServiceInstances)
|
||||
})
|
||||
|
||||
t.Run("update service to not be connect native", func(t *testing.T) {
|
||||
|
@ -453,6 +462,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
|
|||
require.Equal(t, usage.Services, 1)
|
||||
require.Equal(t, usage.ServiceInstances, 1)
|
||||
require.Equal(t, 0, usage.ConnectServiceInstances[connectNativeInstancesTable])
|
||||
require.Equal(t, 1, usage.BillableServiceInstances)
|
||||
})
|
||||
|
||||
t.Run("rename service with a multiple instances", func(t *testing.T) {
|
||||
|
@ -484,6 +494,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
|
|||
require.Equal(t, usage.Services, 2)
|
||||
require.Equal(t, usage.ServiceInstances, 3)
|
||||
require.Equal(t, 2, usage.ConnectServiceInstances[connectNativeInstancesTable])
|
||||
require.Equal(t, 3, usage.BillableServiceInstances)
|
||||
|
||||
update := &structs.NodeService{
|
||||
ID: "service2",
|
||||
|
@ -502,6 +513,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
|
|||
require.Equal(t, usage.Services, 3)
|
||||
require.Equal(t, usage.ServiceInstances, 3)
|
||||
require.Equal(t, 2, usage.ConnectServiceInstances[connectNativeInstancesTable])
|
||||
require.Equal(t, 3, usage.BillableServiceInstances)
|
||||
|
||||
})
|
||||
}
|
||||
|
@ -528,6 +540,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) {
|
|||
require.Equal(t, usage.Services, 1)
|
||||
require.Equal(t, usage.ServiceInstances, 1)
|
||||
require.Equal(t, 1, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
|
||||
require.Equal(t, 0, usage.BillableServiceInstances)
|
||||
})
|
||||
|
||||
t.Run("rename service with a multiple instances", func(t *testing.T) {
|
||||
|
@ -554,6 +567,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) {
|
|||
require.Equal(t, usage.Services, 2)
|
||||
require.Equal(t, usage.ServiceInstances, 3)
|
||||
require.Equal(t, 2, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
|
||||
require.Equal(t, 1, usage.BillableServiceInstances)
|
||||
|
||||
update := &structs.NodeService{
|
||||
ID: "service3",
|
||||
|
@ -569,6 +583,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) {
|
|||
require.Equal(t, usage.Services, 3)
|
||||
require.Equal(t, usage.ServiceInstances, 3)
|
||||
require.Equal(t, 1, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
|
||||
require.Equal(t, 2, usage.BillableServiceInstances)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -88,6 +88,10 @@ var Gauges = []prometheus.GaugeDefinition{
|
|||
Name: []string{"state", "config_entries"},
|
||||
Help: "Measures the current number of unique configuration entries registered with Consul, labeled by Kind. It is only emitted by Consul servers. Added in v1.10.4.",
|
||||
},
|
||||
{
|
||||
Name: []string{"state", "billable_service_instances"},
|
||||
Help: "Total number of billable service instances in the local datacenter.",
|
||||
},
|
||||
}
|
||||
|
||||
type getMembersFunc func() []serf.Member
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"github.com/hashicorp/serf/serf"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
func (u *UsageMetricsReporter) emitNodeUsage(nodeUsage state.NodeUsage) {
|
||||
|
@ -74,7 +75,7 @@ func (u *UsageMetricsReporter) emitMemberUsage(members []serf.Member) {
|
|||
)
|
||||
}
|
||||
|
||||
func (u *UsageMetricsReporter) emitServiceUsage(serviceUsage state.ServiceUsage) {
|
||||
func (u *UsageMetricsReporter) emitServiceUsage(serviceUsage structs.ServiceUsage) {
|
||||
metrics.SetGaugeWithLabels(
|
||||
[]string{"consul", "state", "services"},
|
||||
float32(serviceUsage.Services),
|
||||
|
@ -96,6 +97,11 @@ func (u *UsageMetricsReporter) emitServiceUsage(serviceUsage state.ServiceUsage)
|
|||
float32(serviceUsage.ServiceInstances),
|
||||
u.metricLabels,
|
||||
)
|
||||
metrics.SetGaugeWithLabels(
|
||||
[]string{"state", "billable_service_instances"},
|
||||
float32(serviceUsage.BillableServiceInstances),
|
||||
u.metricLabels,
|
||||
)
|
||||
|
||||
for k, i := range serviceUsage.ConnectServiceInstances {
|
||||
metrics.SetGaugeWithLabels(
|
||||
|
|
|
@ -178,6 +178,13 @@ var baseCases = map[string]testCase{
|
|||
{Name: "kind", Value: "connect-native"},
|
||||
},
|
||||
},
|
||||
"consul.usage.test.state.billable_service_instances;datacenter=dc1": {
|
||||
Name: "consul.usage.test.state.billable_service_instances",
|
||||
Value: 0,
|
||||
Labels: []metrics.Label{
|
||||
{Name: "datacenter", Value: "dc1"},
|
||||
},
|
||||
},
|
||||
// --- kv ---
|
||||
"consul.usage.test.consul.state.kv_entries;datacenter=dc1": { // Legacy
|
||||
Name: "consul.usage.test.consul.state.kv_entries",
|
||||
|
@ -518,6 +525,13 @@ var baseCases = map[string]testCase{
|
|||
{Name: "kind", Value: "connect-native"},
|
||||
},
|
||||
},
|
||||
"consul.usage.test.state.billable_service_instances;datacenter=dc1": {
|
||||
Name: "consul.usage.test.state.billable_service_instances",
|
||||
Value: 0,
|
||||
Labels: []metrics.Label{
|
||||
{Name: "datacenter", Value: "dc1"},
|
||||
},
|
||||
},
|
||||
// --- kv ---
|
||||
"consul.usage.test.consul.state.kv_entries;datacenter=dc1": { // Legacy
|
||||
Name: "consul.usage.test.consul.state.kv_entries",
|
||||
|
@ -1016,6 +1030,13 @@ func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) {
|
|||
{Name: "kind", Value: "connect-native"},
|
||||
},
|
||||
}
|
||||
nodesAndSvcsCase.expectedGauges["consul.usage.test.state.billable_service_instances;datacenter=dc1"] = metrics.GaugeValue{
|
||||
Name: "consul.usage.test.state.billable_service_instances",
|
||||
Value: 3,
|
||||
Labels: []metrics.Label{
|
||||
{Name: "datacenter", Value: "dc1"},
|
||||
},
|
||||
}
|
||||
nodesAndSvcsCase.expectedGauges["consul.usage.test.consul.state.config_entries;datacenter=dc1;kind=ingress-gateway"] = metrics.GaugeValue{ // Legacy
|
||||
Name: "consul.usage.test.consul.state.config_entries",
|
||||
Value: 3,
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
"github.com/hashicorp/go-memdb"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
)
|
||||
|
@ -206,5 +205,5 @@ func (c *Controller) countProxies(ctx context.Context) (<-chan error, uint32, er
|
|||
|
||||
type Store interface {
|
||||
AbandonCh() <-chan struct{}
|
||||
ServiceUsage(ws memdb.WatchSet) (uint64, state.ServiceUsage, error)
|
||||
ServiceUsage(ws memdb.WatchSet) (uint64, structs.ServiceUsage, error)
|
||||
}
|
||||
|
|
|
@ -2250,6 +2250,21 @@ type IndexedServices struct {
|
|||
QueryMeta
|
||||
}
|
||||
|
||||
type Usage struct {
|
||||
Usage map[string]ServiceUsage
|
||||
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
// ServiceUsage contains all of the usage data related to services
|
||||
type ServiceUsage struct {
|
||||
Services int
|
||||
ServiceInstances int
|
||||
ConnectServiceInstances map[string]int
|
||||
BillableServiceInstances int
|
||||
EnterpriseServiceUsage
|
||||
}
|
||||
|
||||
// PeeredServiceName is a basic tuple of ServiceName and peer
|
||||
type PeeredServiceName struct {
|
||||
ServiceName ServiceName
|
||||
|
|
|
@ -169,3 +169,5 @@ func (t *Intention) HasWildcardDestination() bool {
|
|||
func (s *ServiceNode) NodeIdentity() Identity {
|
||||
return Identity{ID: s.Node}
|
||||
}
|
||||
|
||||
type EnterpriseServiceUsage struct{}
|
||||
|
|
Loading…
Reference in New Issue