mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
240 lines
7.1 KiB
240 lines
7.1 KiB
// Copyright 2013 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package remote |
|
|
|
import ( |
|
"time" |
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
"github.com/prometheus/common/log" |
|
"github.com/prometheus/common/model" |
|
) |
|
|
|
const ( |
|
// The maximum number of concurrent send requests to the remote storage. |
|
maxConcurrentSends = 10 |
|
// The maximum number of samples to fit into a single request to the remote storage. |
|
maxSamplesPerSend = 100 |
|
// The deadline after which to send queued samples even if the maximum batch |
|
// size has not been reached. |
|
batchSendDeadline = 5 * time.Second |
|
) |
|
|
|
// String constants for instrumentation. |
|
const ( |
|
namespace = "prometheus" |
|
subsystem = "remote_storage" |
|
|
|
result = "result" |
|
success = "success" |
|
failure = "failure" |
|
dropped = "dropped" |
|
) |
|
|
|
// StorageClient defines an interface for sending a batch of samples to an |
|
// external timeseries database. |
|
type StorageClient interface { |
|
// Store stores the given samples in the remote storage. |
|
Store(model.Samples) error |
|
// Name identifies the remote storage implementation. |
|
Name() string |
|
} |
|
|
|
// StorageQueueManager manages a queue of samples to be sent to the Storage |
|
// indicated by the provided StorageClient. |
|
type StorageQueueManager struct { |
|
tsdb StorageClient |
|
queue chan *model.Sample |
|
pendingSamples model.Samples |
|
sendSemaphore chan bool |
|
drained chan bool |
|
|
|
samplesCount *prometheus.CounterVec |
|
sendLatency prometheus.Summary |
|
failedBatches prometheus.Counter |
|
failedSamples prometheus.Counter |
|
queueLength prometheus.Gauge |
|
queueCapacity prometheus.Metric |
|
} |
|
|
|
// NewStorageQueueManager builds a new StorageQueueManager. |
|
func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueueManager { |
|
constLabels := prometheus.Labels{ |
|
"type": tsdb.Name(), |
|
} |
|
|
|
return &StorageQueueManager{ |
|
tsdb: tsdb, |
|
queue: make(chan *model.Sample, queueCapacity), |
|
sendSemaphore: make(chan bool, maxConcurrentSends), |
|
drained: make(chan bool), |
|
|
|
samplesCount: prometheus.NewCounterVec( |
|
prometheus.CounterOpts{ |
|
Namespace: namespace, |
|
Subsystem: subsystem, |
|
Name: "sent_samples_total", |
|
Help: "Total number of processed samples to be sent to remote storage.", |
|
ConstLabels: constLabels, |
|
}, |
|
[]string{result}, |
|
), |
|
sendLatency: prometheus.NewSummary(prometheus.SummaryOpts{ |
|
Namespace: namespace, |
|
Subsystem: subsystem, |
|
Name: "send_latency_seconds", |
|
Help: "Latency quantiles for sending sample batches to the remote storage.", |
|
ConstLabels: constLabels, |
|
}), |
|
failedBatches: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Namespace: namespace, |
|
Subsystem: subsystem, |
|
Name: "failed_batches_total", |
|
Help: "Total number of sample batches that encountered an error while being sent to the remote storage.", |
|
ConstLabels: constLabels, |
|
}), |
|
failedSamples: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Namespace: namespace, |
|
Subsystem: subsystem, |
|
Name: "failed_samples_total", |
|
Help: "Total number of samples that encountered an error while being sent to the remote storage.", |
|
ConstLabels: constLabels, |
|
}), |
|
queueLength: prometheus.NewGauge(prometheus.GaugeOpts{ |
|
Namespace: namespace, |
|
Subsystem: subsystem, |
|
Name: "queue_length", |
|
Help: "The number of processed samples queued to be sent to the remote storage.", |
|
ConstLabels: constLabels, |
|
}), |
|
queueCapacity: prometheus.MustNewConstMetric( |
|
prometheus.NewDesc( |
|
prometheus.BuildFQName(namespace, subsystem, "queue_capacity"), |
|
"The capacity of the queue of samples to be sent to the remote storage.", |
|
nil, |
|
constLabels, |
|
), |
|
prometheus.GaugeValue, |
|
float64(queueCapacity), |
|
), |
|
} |
|
} |
|
|
|
// Append queues a sample to be sent to the remote storage. It drops the |
|
// sample on the floor if the queue is full. It implements |
|
// storage.SampleAppender. |
|
func (t *StorageQueueManager) Append(s *model.Sample) { |
|
select { |
|
case t.queue <- s: |
|
default: |
|
t.samplesCount.WithLabelValues(dropped).Inc() |
|
log.Warn("Remote storage queue full, discarding sample.") |
|
} |
|
} |
|
|
|
// Stop stops sending samples to the remote storage and waits for pending |
|
// sends to complete. |
|
func (t *StorageQueueManager) Stop() { |
|
log.Infof("Stopping remote storage...") |
|
close(t.queue) |
|
<-t.drained |
|
for i := 0; i < maxConcurrentSends; i++ { |
|
t.sendSemaphore <- true |
|
} |
|
log.Info("Remote storage stopped.") |
|
} |
|
|
|
// Describe implements prometheus.Collector. |
|
func (t *StorageQueueManager) Describe(ch chan<- *prometheus.Desc) { |
|
t.samplesCount.Describe(ch) |
|
t.sendLatency.Describe(ch) |
|
ch <- t.failedBatches.Desc() |
|
ch <- t.failedSamples.Desc() |
|
ch <- t.queueLength.Desc() |
|
ch <- t.queueCapacity.Desc() |
|
} |
|
|
|
// Collect implements prometheus.Collector. |
|
func (t *StorageQueueManager) Collect(ch chan<- prometheus.Metric) { |
|
t.samplesCount.Collect(ch) |
|
t.sendLatency.Collect(ch) |
|
t.queueLength.Set(float64(len(t.queue))) |
|
ch <- t.failedBatches |
|
ch <- t.failedSamples |
|
ch <- t.queueLength |
|
ch <- t.queueCapacity |
|
} |
|
|
|
func (t *StorageQueueManager) sendSamples(s model.Samples) { |
|
t.sendSemaphore <- true |
|
defer func() { |
|
<-t.sendSemaphore |
|
}() |
|
|
|
// Samples are sent to the remote storage on a best-effort basis. If a |
|
// sample isn't sent correctly the first time, it's simply dropped on the |
|
// floor. |
|
begin := time.Now() |
|
err := t.tsdb.Store(s) |
|
duration := time.Since(begin) / time.Second |
|
|
|
labelValue := success |
|
if err != nil { |
|
log.Warnf("error sending %d samples to remote storage: %s", len(s), err) |
|
labelValue = failure |
|
t.failedBatches.Inc() |
|
t.failedSamples.Add(float64(len(s))) |
|
} |
|
t.samplesCount.WithLabelValues(labelValue).Add(float64(len(s))) |
|
t.sendLatency.Observe(float64(duration)) |
|
} |
|
|
|
// Run continuously sends samples to the remote storage. |
|
func (t *StorageQueueManager) Run() { |
|
defer func() { |
|
close(t.drained) |
|
}() |
|
|
|
// Send batches of at most maxSamplesPerSend samples to the remote storage. |
|
// If we have fewer samples than that, flush them out after a deadline |
|
// anyways. |
|
for { |
|
select { |
|
case s, ok := <-t.queue: |
|
if !ok { |
|
log.Infof("Flushing %d samples to remote storage...", len(t.pendingSamples)) |
|
t.flush() |
|
log.Infof("Done flushing.") |
|
return |
|
} |
|
|
|
t.pendingSamples = append(t.pendingSamples, s) |
|
|
|
for len(t.pendingSamples) >= maxSamplesPerSend { |
|
go t.sendSamples(t.pendingSamples[:maxSamplesPerSend]) |
|
t.pendingSamples = t.pendingSamples[maxSamplesPerSend:] |
|
} |
|
case <-time.After(batchSendDeadline): |
|
t.flush() |
|
} |
|
} |
|
} |
|
|
|
// Flush flushes remaining queued samples. |
|
func (t *StorageQueueManager) flush() { |
|
if len(t.pendingSamples) > 0 { |
|
go t.sendSamples(t.pendingSamples) |
|
} |
|
t.pendingSamples = t.pendingSamples[:0] |
|
}
|
|
|