mirror of https://github.com/prometheus/prometheus
parent
50878ebe5e
commit
bd6436605d
|
@ -1011,11 +1011,10 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool {
|
||||||
if !appended {
|
if !appended {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
switch data.isSample {
|
if data.isSample {
|
||||||
case true:
|
|
||||||
s.qm.metrics.pendingSamples.Inc()
|
s.qm.metrics.pendingSamples.Inc()
|
||||||
s.enqueuedSamples.Inc()
|
s.enqueuedSamples.Inc()
|
||||||
case false:
|
} else {
|
||||||
s.qm.metrics.pendingExemplars.Inc()
|
s.qm.metrics.pendingExemplars.Inc()
|
||||||
s.enqueuedExemplars.Inc()
|
s.enqueuedExemplars.Inc()
|
||||||
}
|
}
|
||||||
|
@ -1220,15 +1219,14 @@ func (s *shards) populateTimeSeries(batch []sampleOrExemplar, pendingData []prom
|
||||||
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
|
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
|
||||||
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
|
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
|
||||||
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
|
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
|
||||||
switch d.isSample {
|
if d.isSample {
|
||||||
case true:
|
|
||||||
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
|
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
|
||||||
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
|
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
|
||||||
Value: d.value,
|
Value: d.value,
|
||||||
Timestamp: d.timestamp,
|
Timestamp: d.timestamp,
|
||||||
})
|
})
|
||||||
nPendingSamples++
|
nPendingSamples++
|
||||||
case false:
|
} else {
|
||||||
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
|
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
|
||||||
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{
|
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{
|
||||||
Labels: labelsToLabelsProto(d.exemplarLabels, nil),
|
Labels: labelsToLabelsProto(d.exemplarLabels, nil),
|
||||||
|
|
|
@ -578,22 +578,6 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestWriteClient) expectDataCount(numSamples int) {
|
|
||||||
if !c.withWaitGroup {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.mtx.Lock()
|
|
||||||
defer c.mtx.Unlock()
|
|
||||||
c.wg.Add(numSamples)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TestWriteClient) waitForExpectedDataCount() {
|
|
||||||
if !c.withWaitGroup {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
|
func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
|
|
Loading…
Reference in New Issue