@ -186,6 +186,9 @@ type StorageClient interface {
// indicated by the provided StorageClient. Implements writeTo interface
// indicated by the provided StorageClient. Implements writeTo interface
// used by WAL Watcher.
// used by WAL Watcher.
type QueueManager struct {
type QueueManager struct {
// https://golang.org/pkg/sync/atomic/#pkg-note-BUG
lastSendTimestamp int64
logger log . Logger
logger log . Logger
flushDeadline time . Duration
flushDeadline time . Duration
cfg config . QueueConfig
cfg config . QueueConfig
@ -479,14 +482,30 @@ func (t *QueueManager) updateShardsLoop() {
for {
for {
select {
select {
case <- ticker . C :
case <- ticker . C :
t . calculateDesiredShards ( )
desiredShards := t . calculateDesiredShards ( )
if desiredShards == t . numShards {
continue
}
// Resharding can take some time, and we want this loop
// to stay close to shardUpdateDuration.
select {
case t . reshardChan <- desiredShards :
level . Info ( t . logger ) . Log ( "msg" , "Remote storage resharding" , "from" , t . numShards , "to" , numShards )
t . numShards = desiredShards
default :
level . Info ( t . logger ) . Log ( "msg" , "Currently resharding, skipping." )
}
case <- t . quit :
case <- t . quit :
return
return
}
}
}
}
}
}
func ( t * QueueManager ) calculateDesiredShards ( ) {
// calculateDesiredShards returns the number of desired shards, which will be
// the current QueueManager.numShards if resharding should not occur for reasons
// outlined in this functions implementation. It is up to the caller to reshard, or not,
// based on the return value.
func ( t * QueueManager ) calculateDesiredShards ( ) int {
t . samplesOut . tick ( )
t . samplesOut . tick ( )
t . samplesDropped . tick ( )
t . samplesDropped . tick ( )
t . samplesOutDuration . tick ( )
t . samplesOutDuration . tick ( )
@ -507,7 +526,7 @@ func (t *QueueManager) calculateDesiredShards() {
)
)
if samplesOutRate <= 0 {
if samplesOutRate <= 0 {
return
return t . numShards
}
}
// We use an integral accumulator, like in a PID, to help dampen
// We use an integral accumulator, like in a PID, to help dampen
@ -523,6 +542,15 @@ func (t *QueueManager) calculateDesiredShards() {
}
}
t . integralAccumulator += samplesPendingRate * integralGain
t . integralAccumulator += samplesPendingRate * integralGain
// We shouldn't reshard if Prometheus hasn't been able to send to the
// remote endpoint successfully within some period of time.
minSendTimestamp := time . Now ( ) . Add ( - 2 * time . Duration ( t . cfg . BatchSendDeadline ) ) . Unix ( )
lsts := atomic . LoadInt64 ( & t . lastSendTimestamp )
if lsts < minSendTimestamp {
level . Warn ( t . logger ) . Log ( "msg" , "Skipping resharding, last successful send was beyond threshold" , "lastSendTimestamp" , lsts , "minSendTimestamp" , minSendTimestamp )
return t . numShards
}
var (
var (
timePerSample = samplesOutDuration / samplesOutRate
timePerSample = samplesOutDuration / samplesOutRate
desiredShards = timePerSample * ( samplesInRate + t . integralAccumulator )
desiredShards = timePerSample * ( samplesInRate + t . integralAccumulator )
@ -549,7 +577,7 @@ func (t *QueueManager) calculateDesiredShards() {
level . Debug ( t . logger ) . Log ( "msg" , "QueueManager.updateShardsLoop" ,
level . Debug ( t . logger ) . Log ( "msg" , "QueueManager.updateShardsLoop" ,
"lowerBound" , lowerBound , "desiredShards" , desiredShards , "upperBound" , upperBound )
"lowerBound" , lowerBound , "desiredShards" , desiredShards , "upperBound" , upperBound )
if lowerBound <= desiredShards && desiredShards <= upperBound {
if lowerBound <= desiredShards && desiredShards <= upperBound {
return
return t . numShards
}
}
numShards := int ( math . Ceil ( desiredShards ) )
numShards := int ( math . Ceil ( desiredShards ) )
@ -559,19 +587,7 @@ func (t *QueueManager) calculateDesiredShards() {
} else if numShards < t . cfg . MinShards {
} else if numShards < t . cfg . MinShards {
numShards = t . cfg . MinShards
numShards = t . cfg . MinShards
}
}
if numShards == t . numShards {
return numShards
return
}
// Resharding can take some time, and we want this loop
// to stay close to shardUpdateDuration.
select {
case t . reshardChan <- numShards :
level . Info ( t . logger ) . Log ( "msg" , "Remote storage resharding" , "from" , t . numShards , "to" , numShards )
t . numShards = numShards
default :
level . Info ( t . logger ) . Log ( "msg" , "Currently resharding, skipping." )
}
}
}
func ( t * QueueManager ) reshardLoop ( ) {
func ( t * QueueManager ) reshardLoop ( ) {
@ -813,6 +829,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
if err == nil {
if err == nil {
s . qm . succeededSamplesTotal . Add ( float64 ( len ( samples ) ) )
s . qm . succeededSamplesTotal . Add ( float64 ( len ( samples ) ) )
s . qm . highestSentTimestampMetric . Set ( float64 ( highest / 1000 ) )
s . qm . highestSentTimestampMetric . Set ( float64 ( highest / 1000 ) )
atomic . StoreInt64 ( & s . qm . lastSendTimestamp , time . Now ( ) . Unix ( ) )
return nil
return nil
}
}