|
|
|
@ -454,7 +454,7 @@ func NewQueueManager(
|
|
|
|
|
logger = log.NewNopLogger() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Copy externalLabels into slice which we need for processExternalLabels.
|
|
|
|
|
// Copy externalLabels into a slice, which we need for processExternalLabels.
|
|
|
|
|
extLabelsSlice := make([]labels.Label, 0, externalLabels.Len()) |
|
|
|
|
externalLabels.Range(func(l labels.Label) { |
|
|
|
|
extLabelsSlice = append(extLabelsSlice, l) |
|
|
|
@ -499,7 +499,7 @@ func NewQueueManager(
|
|
|
|
|
return t |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// AppendMetadata sends metadata the remote storage. Metadata is sent in batches, but is not parallelized.
|
|
|
|
|
// AppendMetadata sends metadata to the remote storage. Metadata is sent in batches, but is not parallelized.
|
|
|
|
|
func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.MetricMetadata) { |
|
|
|
|
mm := make([]prompb.MetricMetadata, 0, len(metadata)) |
|
|
|
|
for _, entry := range metadata { |
|
|
|
@ -944,7 +944,7 @@ func (t *QueueManager) updateShardsLoop() {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// shouldReshard returns if resharding should occur
|
|
|
|
|
// shouldReshard returns whether resharding should occur.
|
|
|
|
|
func (t *QueueManager) shouldReshard(desiredShards int) bool { |
|
|
|
|
if desiredShards == t.numShards { |
|
|
|
|
return false |
|
|
|
@ -1123,7 +1123,7 @@ func (s *shards) start(n int) {
|
|
|
|
|
// stop the shards; subsequent call to enqueue will return false.
|
|
|
|
|
func (s *shards) stop() { |
|
|
|
|
// Attempt a clean shutdown, but only wait flushDeadline for all the shards
|
|
|
|
|
// to cleanly exit. As we're doing RPCs, enqueue can block indefinitely.
|
|
|
|
|
// to cleanly exit. As we're doing RPCs, enqueue can block indefinitely.
|
|
|
|
|
// We must be able so call stop concurrently, hence we can only take the
|
|
|
|
|
// RLock here.
|
|
|
|
|
s.mtx.RLock() |
|
|
|
@ -1471,7 +1471,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
|
|
|
|
|
s.qm.dataOut.incr(int64(len(samples))) |
|
|
|
|
s.qm.dataOutDuration.incr(int64(time.Since(begin))) |
|
|
|
|
s.qm.lastSendTimestamp.Store(time.Now().Unix()) |
|
|
|
|
// Pending samples/exemplars/histograms also should be subtracted as an error means
|
|
|
|
|
// Pending samples/exemplars/histograms also should be subtracted, as an error means
|
|
|
|
|
// they will not be retried.
|
|
|
|
|
s.qm.metrics.pendingSamples.Sub(float64(sampleCount)) |
|
|
|
|
s.qm.metrics.pendingExemplars.Sub(float64(exemplarCount)) |
|
|
|
|