|
|
|
@ -179,26 +179,29 @@ func (t *StorageQueueManager) Collect(ch chan<- prometheus.Metric) {
|
|
|
|
|
|
|
|
|
|
func (t *StorageQueueManager) sendSamples(s model.Samples) {
|
|
|
|
|
t.sendSemaphore <- true
|
|
|
|
|
defer func() {
|
|
|
|
|
<-t.sendSemaphore
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Samples are sent to the remote storage on a best-effort basis. If a
|
|
|
|
|
// sample isn't sent correctly the first time, it's simply dropped on the
|
|
|
|
|
// floor.
|
|
|
|
|
begin := time.Now()
|
|
|
|
|
err := t.tsdb.Store(s)
|
|
|
|
|
duration := time.Since(begin) / time.Second
|
|
|
|
|
|
|
|
|
|
labelValue := success
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Warnf("error sending %d samples to remote storage: %s", len(s), err)
|
|
|
|
|
labelValue = failure
|
|
|
|
|
t.failedBatches.Inc()
|
|
|
|
|
t.failedSamples.Add(float64(len(s)))
|
|
|
|
|
}
|
|
|
|
|
t.samplesCount.WithLabelValues(labelValue).Add(float64(len(s)))
|
|
|
|
|
t.sendLatency.Observe(float64(duration))
|
|
|
|
|
go func() {
|
|
|
|
|
defer func() {
|
|
|
|
|
<-t.sendSemaphore
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Samples are sent to the remote storage on a best-effort basis. If a
|
|
|
|
|
// sample isn't sent correctly the first time, it's simply dropped on the
|
|
|
|
|
// floor.
|
|
|
|
|
begin := time.Now()
|
|
|
|
|
err := t.tsdb.Store(s)
|
|
|
|
|
duration := time.Since(begin) / time.Second
|
|
|
|
|
|
|
|
|
|
labelValue := success
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Warnf("error sending %d samples to remote storage: %s", len(s), err)
|
|
|
|
|
labelValue = failure
|
|
|
|
|
t.failedBatches.Inc()
|
|
|
|
|
t.failedSamples.Add(float64(len(s)))
|
|
|
|
|
}
|
|
|
|
|
t.samplesCount.WithLabelValues(labelValue).Add(float64(len(s)))
|
|
|
|
|
t.sendLatency.Observe(float64(duration))
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Run continuously sends samples to the remote storage.
|
|
|
|
@ -223,7 +226,7 @@ func (t *StorageQueueManager) Run() {
|
|
|
|
|
t.pendingSamples = append(t.pendingSamples, s)
|
|
|
|
|
|
|
|
|
|
for len(t.pendingSamples) >= maxSamplesPerSend {
|
|
|
|
|
go t.sendSamples(t.pendingSamples[:maxSamplesPerSend])
|
|
|
|
|
t.sendSamples(t.pendingSamples[:maxSamplesPerSend])
|
|
|
|
|
t.pendingSamples = t.pendingSamples[maxSamplesPerSend:]
|
|
|
|
|
}
|
|
|
|
|
case <-time.After(batchSendDeadline):
|
|
|
|
@ -235,7 +238,7 @@ func (t *StorageQueueManager) Run() {
|
|
|
|
|
// Flush flushes remaining queued samples.
|
|
|
|
|
func (t *StorageQueueManager) flush() {
|
|
|
|
|
if len(t.pendingSamples) > 0 {
|
|
|
|
|
go t.sendSamples(t.pendingSamples)
|
|
|
|
|
t.sendSamples(t.pendingSamples)
|
|
|
|
|
}
|
|
|
|
|
t.pendingSamples = t.pendingSamples[:0]
|
|
|
|
|
}
|
|
|
|
|