@ -61,6 +61,7 @@ const (
// usage metrics that we track.
func updateUsage ( tx WriteTxn , changes Changes ) error {
usageDeltas := make ( map [ string ] int )
serviceNameChanges := make ( map [ structs . ServiceName ] int )
for _ , change := range changes . Changes {
var delta int
if change . Created ( ) {
@ -75,73 +76,75 @@ func updateUsage(tx WriteTxn, changes Changes) error {
case "services" :
svc := changeObject ( change ) . ( * structs . ServiceNode )
usageDeltas [ change . Table ] += delta
serviceIter , err := getWithTxn ( tx , servicesTableName , "service" , svc . ServiceName , & svc . EnterpriseMeta )
addEnterpriseServiceInstanceUsage ( usageDeltas , change )
// Construct a mapping of all of the various service names that were
// changed, in order to compare it with the finished memdb state.
// Make sure to account for the fact that services can change their names.
if serviceNameChanged ( change ) {
serviceNameChanges [ svc . CompoundServiceName ( ) ] += 1
before := change . Before . ( * structs . ServiceNode )
serviceNameChanges [ before . CompoundServiceName ( ) ] -= 1
} else {
serviceNameChanges [ svc . CompoundServiceName ( ) ] += delta
}
}
}
serviceStates , err := updateServiceNameUsage ( tx , usageDeltas , serviceNameChanges )
if err != nil {
return err
}
addEnterpriseServiceUsage ( usageDeltas , serviceStates )
idx := changes . Index
// This will happen when restoring from a snapshot, just take the max index
// of the tables we are tracking.
if idx == 0 {
idx = maxIndexTxn ( tx , "nodes" , servicesTableName )
}
return writeUsageDeltas ( tx , idx , usageDeltas )
}
func updateServiceNameUsage ( tx WriteTxn , usageDeltas map [ string ] int , serviceNameChanges map [ structs . ServiceName ] int ) ( map [ structs . ServiceName ] uniqueServiceState , error ) {
serviceStates := make ( map [ structs . ServiceName ] uniqueServiceState , len ( serviceNameChanges ) )
for svc , delta := range serviceNameChanges {
serviceIter , err := getWithTxn ( tx , servicesTableName , "service" , svc . Name , & svc . EnterpriseMeta )
if err != nil {
return nil , err
}
// Count the number of service instances associated with the given service
// name at the end of this transaction, and compare that with how many were
// added/removed during the transaction. This allows us to handle a single
// transaction committing multiple changes related to a single service
// name.
var svcCount int
for service := serviceIter . Next ( ) ; service != nil ; service = serviceIter . Next ( ) {
svcCount += 1
}
var serviceState uniqueServiceState
if serviceIter . Next ( ) == nil {
switch {
case svcCount == 0 :
// If no services exist, we know we deleted the last service
// instance.
serviceState = Deleted
usageDeltas [ serviceNamesUsageTable ] -= 1
} else if serviceIter . Next ( ) == nil {
// If a second call to Next() returns nil, we know only a single
// instance exists. If, in addition, a new service name has been
// registered, either via creating a new service instance or via
// renaming an existing service, than we update our service count.
//
// We only care about two cases here:
// 1. A new service instance has been created with a unique name
// 2. An existing service instance has been updated with a new unique name
//
// These are the only ways a new unique service can be created. The
// other valid cases here: an update that does not change the service
// name, and a deletion, both do not impact the count of unique service
// names in the system.
if change . Created ( ) {
// Given a single existing service instance of the service: If a
// service has just been created, then we know this is a new unique
// service.
case svcCount == delta :
// If the current number of service instances equals the number added,
// than we know we created a new service name.
serviceState = Created
usageDeltas [ serviceNamesUsageTable ] += 1
} else if serviceNameChanged ( change ) {
// Given a single existing service instance of the service: If a
// service has been updated with a new service name, then we know
// this is a new unique service.
serviceState = Created
usageDeltas [ serviceNamesUsageTable ] += 1
// Check whether the previous name was deleted in this rename, this
// is a special case of renaming a service which does not result in
// changing the count of unique service names.
before := change . Before . ( * structs . ServiceNode )
beforeSvc , err := firstWithTxn ( tx , servicesTableName , "service" , before . ServiceName , & before . EnterpriseMeta )
if err != nil {
return err
}
if beforeSvc == nil {
usageDeltas [ serviceNamesUsageTable ] -= 1
// set serviceState to NoChange since we have both gained and lost a
// service, cancelling each other out
default :
serviceState = NoChange
}
}
}
addEnterpriseServiceUsage ( usageDeltas , change , serviceState )
}
}
idx := changes . Index
// This will happen when restoring from a snapshot, just take the max index
// of the tables we are tracking.
if idx == 0 {
idx = maxIndexTxn ( tx , "nodes" , servicesTableName )
serviceStates [ svc ] = serviceState
}
return writeUsageDeltas ( tx , idx , usageDeltas )
return serviceStates , nil
}
// serviceNameChanged returns a boolean that indicates whether the
@ -168,7 +171,11 @@ func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error
if u == nil {
if delta < 0 {
return fmt . Errorf ( "failed to insert usage entry for %q: delta will cause a negative count" , id )
// Don't return an error here, since we don't want to block updates
// from happening to the state store. But, set the delta to 0 so that
// we do not accidentally underflow the uint64 and begin reporting
// large numbers.
delta = 0
}
err := tx . Insert ( "usage" , & UsageEntry {
ID : id ,
@ -179,12 +186,17 @@ func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error
return fmt . Errorf ( "failed to update usage entry: %s" , err )
}
} else if cur , ok := u . ( * UsageEntry ) ; ok {
if cur . Count + delta < 0 {
return fmt . Errorf ( "failed to insert usage entry for %q: delta will cause a negative count" , id )
updated := cur . Count + delta
if updated < 0 {
// Don't return an error here, since we don't want to block updates
// from happening to the state store. But, set the delta to 0 so that
// we do not accidentally underflow the uint64 and begin reporting
// large numbers.
updated = 0
}
err := tx . Insert ( "usage" , & UsageEntry {
ID : id ,
Count : cur . Count + delta ,
Count : updated ,
Index : idx ,
} )
if err != nil {