@ -158,18 +158,13 @@ type StorageClient interface {
// indicated by the provided StorageClient. Implements writeTo interface
// indicated by the provided StorageClient. Implements writeTo interface
// used by WAL Watcher.
// used by WAL Watcher.
type QueueManager struct {
type QueueManager struct {
logger log . Logger
logger log . Logger
flushDeadline time . Duration
flushDeadline time . Duration
cfg config . QueueConfig
cfg config . QueueConfig
externalLabels model . LabelSet
externalLabels model . LabelSet
relabelConfigs [ ] * pkgrelabel . Config
relabelConfigs [ ] * pkgrelabel . Config
client StorageClient
client StorageClient
watcher * WALWatcher
queueName string
watcher * WALWatcher
highestSentTimestampMetric * maxGauge
pendingSamplesMetric prometheus . Gauge
enqueueRetriesMetric prometheus . Counter
seriesMtx sync . Mutex
seriesMtx sync . Mutex
seriesLabels map [ uint64 ] [ ] prompb . Label
seriesLabels map [ uint64 ] [ ] prompb . Label
@ -184,6 +179,16 @@ type QueueManager struct {
samplesIn , samplesDropped , samplesOut , samplesOutDuration * ewmaRate
samplesIn , samplesDropped , samplesOut , samplesOutDuration * ewmaRate
integralAccumulator float64
integralAccumulator float64
highestSentTimestampMetric * maxGauge
pendingSamplesMetric prometheus . Gauge
enqueueRetriesMetric prometheus . Counter
droppedSamplesTotal prometheus . Counter
numShardsMetric prometheus . Gauge
failedSamplesTotal prometheus . Counter
sentBatchDuration prometheus . Observer
succeededSamplesTotal prometheus . Counter
retriedSamplesTotal prometheus . Counter
}
}
// NewQueueManager builds a new QueueManager.
// NewQueueManager builds a new QueueManager.
@ -191,14 +196,16 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, cfg
if logger == nil {
if logger == nil {
logger = log . NewNopLogger ( )
logger = log . NewNopLogger ( )
}
}
name := client . Name ( )
logger = log . With ( logger , "queue" , name )
t := & QueueManager {
t := & QueueManager {
logger : log . With ( logger , "queue" , client . Name ( ) ) ,
logger : log ger,
flushDeadline : flushDeadline ,
flushDeadline : flushDeadline ,
cfg : cfg ,
cfg : cfg ,
externalLabels : externalLabels ,
externalLabels : externalLabels ,
relabelConfigs : relabelConfigs ,
relabelConfigs : relabelConfigs ,
client : client ,
client : client ,
queueName : client . Name ( ) ,
seriesLabels : make ( map [ uint64 ] [ ] prompb . Label ) ,
seriesLabels : make ( map [ uint64 ] [ ] prompb . Label ) ,
seriesSegmentIndexes : make ( map [ uint64 ] int ) ,
seriesSegmentIndexes : make ( map [ uint64 ] int ) ,
@ -212,26 +219,25 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, cfg
samplesDropped : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
samplesDropped : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
samplesOut : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
samplesOut : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
samplesOutDuration : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
samplesOutDuration : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
}
t . highestSentTimestampMetric = & maxGauge {
highestSentTimestampMetric : & maxGauge {
Gauge : queueHighestSentTimestamp . WithLabelValues ( t . queueName ) ,
Gauge : queueHighestSentTimestamp . WithLabelValues ( name ) ,
} ,
pendingSamplesMetric : queuePendingSamples . WithLabelValues ( name ) ,
enqueueRetriesMetric : enqueueRetriesTotal . WithLabelValues ( name ) ,
droppedSamplesTotal : droppedSamplesTotal . WithLabelValues ( name ) ,
numShardsMetric : numShards . WithLabelValues ( name ) ,
failedSamplesTotal : failedSamplesTotal . WithLabelValues ( name ) ,
sentBatchDuration : sentBatchDuration . WithLabelValues ( name ) ,
succeededSamplesTotal : succeededSamplesTotal . WithLabelValues ( name ) ,
retriedSamplesTotal : retriedSamplesTotal . WithLabelValues ( name ) ,
}
}
t . pendingSamplesMetric = queuePendingSamples . WithLabelValues ( t . queueName )
t . enqueueRetriesMetric = enqueueRetriesTotal . WithLabelValues ( t . queueName )
t . watcher = NewWALWatcher ( logger , client . Name ( ) , t , walDir )
t . shards = t . newShards ( )
numShards . WithLabelValues ( t . queueName ) . Set ( float64 ( t . numShards ) )
t . watcher = NewWALWatcher ( logger , name , t , walDir )
shardCapacity . WithLabelValues ( t . queueName ) . Set ( float64 ( t . cfg . Capacity ) )
t . shards = t . newShards ( )
// Initialize counter labels to zero.
// Initialise some metrics.
sentBatchDuration . WithLabelValues ( t . queueName )
shardCapacity . WithLabelValues ( name ) . Set ( float64 ( t . cfg . Capacity ) )
succeededSamplesTotal . WithLabelValues ( t . queueName )
failedSamplesTotal . WithLabelValues ( t . queueName )
droppedSamplesTotal . WithLabelValues ( t . queueName )
retriedSamplesTotal . WithLabelValues ( t . queueName )
// Reset pending samples metric to 0.
t . pendingSamplesMetric . Set ( 0 )
t . pendingSamplesMetric . Set ( 0 )
return t
return t
@ -250,7 +256,7 @@ func (t *QueueManager) Append(s []tsdb.RefSample) bool {
for _ , sample := range s {
for _ , sample := range s {
// If we have no labels for the series, due to relabelling or otherwise, don't send the sample.
// If we have no labels for the series, due to relabelling or otherwise, don't send the sample.
if _ , ok := t . seriesLabels [ sample . Ref ] ; ! ok {
if _ , ok := t . seriesLabels [ sample . Ref ] ; ! ok {
droppedSamplesTotal. WithLabelValues ( t . queueName ) . Inc ( )
t. droppedSamplesTotal. Inc ( )
t . samplesDropped . incr ( 1 )
t . samplesDropped . incr ( 1 )
if _ , ok := t . droppedSeries [ sample . Ref ] ; ! ok {
if _ , ok := t . droppedSeries [ sample . Ref ] ; ! ok {
level . Info ( t . logger ) . Log ( "msg" , "dropped sample for series that was not explicitly dropped via relabelling" , "ref" , sample . Ref )
level . Info ( t . logger ) . Log ( "msg" , "dropped sample for series that was not explicitly dropped via relabelling" , "ref" , sample . Ref )
@ -523,7 +529,7 @@ func (s *shards) start(n int) {
for i := 0 ; i < n ; i ++ {
for i := 0 ; i < n ; i ++ {
go s . runShard ( hardShutdownCtx , i , newQueues [ i ] )
go s . runShard ( hardShutdownCtx , i , newQueues [ i ] )
}
}
numShards. WithLabelValues ( s . qm . queueName ) . Set ( float64 ( n ) )
s. qm . numShardsMetric . Set ( float64 ( n ) )
}
}
// stop the shards; subsequent call to enqueue will return false.
// stop the shards; subsequent call to enqueue will return false.
@ -652,7 +658,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries) {
err := s . sendSamplesWithBackoff ( ctx , samples )
err := s . sendSamplesWithBackoff ( ctx , samples )
if err != nil {
if err != nil {
level . Error ( s . qm . logger ) . Log ( "msg" , "non-recoverable error" , "count" , len ( samples ) , "err" , err )
level . Error ( s . qm . logger ) . Log ( "msg" , "non-recoverable error" , "count" , len ( samples ) , "err" , err )
failedSamplesTotal. WithLabelValues ( s . qm . queueName ) . Add ( float64 ( len ( samples ) ) )
s. qm . failedSamplesTotal. Add ( float64 ( len ( samples ) ) )
}
}
// These counters are used to calculate the dynamic sharding, and as such
// These counters are used to calculate the dynamic sharding, and as such
@ -680,10 +686,10 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
begin := time . Now ( )
begin := time . Now ( )
err := s . qm . client . Store ( ctx , req )
err := s . qm . client . Store ( ctx , req )
s entBatchDuration. WithLabelValues ( s . qm . queueName ) . Observe ( time . Since ( begin ) . Seconds ( ) )
s . qm . s entBatchDuration. Observe ( time . Since ( begin ) . Seconds ( ) )
if err == nil {
if err == nil {
s ucceededSamplesTotal. WithLabelValues ( s . qm . queueName ) . Add ( float64 ( len ( samples ) ) )
s . qm . s ucceededSamplesTotal. Add ( float64 ( len ( samples ) ) )
s . qm . highestSentTimestampMetric . Set ( float64 ( highest / 1000 ) )
s . qm . highestSentTimestampMetric . Set ( float64 ( highest / 1000 ) )
return nil
return nil
}
}
@ -691,7 +697,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
if _ , ok := err . ( recoverableError ) ; ! ok {
if _ , ok := err . ( recoverableError ) ; ! ok {
return err
return err
}
}
retriedSamplesTotal. WithLabelValues ( s . qm . queueName ) . Add ( float64 ( len ( samples ) ) )
s. qm . retriedSamplesTotal. Add ( float64 ( len ( samples ) ) )
level . Debug ( s . qm . logger ) . Log ( "msg" , "failed to send batch, retrying" , "err" , err )
level . Debug ( s . qm . logger ) . Log ( "msg" , "failed to send batch, retrying" , "err" , err )
time . Sleep ( time . Duration ( backoff ) )
time . Sleep ( time . Duration ( backoff ) )