Merge pull request #1931 from tomwilkie/1843-remote-storage-ordering

Remote storage ordering
pull/1938/head
Julius Volz 2016-08-30 17:49:13 +02:00 committed by GitHub
commit 1c271897be
3 changed files with 236 additions and 159 deletions

View File

@ -14,6 +14,7 @@
package remote
import (
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
@ -21,16 +22,6 @@ import (
"github.com/prometheus/common/model"
)
const (
// The maximum number of concurrent send requests to the remote storage.
maxConcurrentSends = 10
// The maximum number of samples to fit into a single request to the remote storage.
maxSamplesPerSend = 100
// The deadline after which to send queued samples even if the maximum batch
// size has not been reached.
batchSendDeadline = 5 * time.Second
)
// String constants for instrumentation.
const (
namespace = "prometheus"
@ -51,66 +42,77 @@ type StorageClient interface {
Name() string
}
type StorageQueueManagerConfig struct {
QueueCapacity int // Number of samples to buffer per shard before we start dropping them.
Shards int // Number of shards, i.e. amount of concurrency.
MaxSamplesPerSend int // Maximum number of samples per send.
BatchSendDeadline time.Duration // Maximum time sample will wait in buffer.
}
var defaultConfig = StorageQueueManagerConfig{
QueueCapacity: 100 * 1024 / 10,
Shards: 10,
MaxSamplesPerSend: 100,
BatchSendDeadline: 5 * time.Second,
}
// StorageQueueManager manages a queue of samples to be sent to the Storage
// indicated by the provided StorageClient.
type StorageQueueManager struct {
tsdb StorageClient
queue chan *model.Sample
pendingSamples model.Samples
sendSemaphore chan bool
drained chan bool
cfg StorageQueueManagerConfig
tsdb StorageClient
shards []chan *model.Sample
wg sync.WaitGroup
done chan struct{}
samplesCount *prometheus.CounterVec
sendLatency prometheus.Summary
failedBatches prometheus.Counter
failedSamples prometheus.Counter
queueLength prometheus.Gauge
queueCapacity prometheus.Metric
sentSamplesTotal *prometheus.CounterVec
sentBatchDuration *prometheus.HistogramVec
queueLength prometheus.Gauge
queueCapacity prometheus.Metric
}
// NewStorageQueueManager builds a new StorageQueueManager.
func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueueManager {
func NewStorageQueueManager(tsdb StorageClient, cfg *StorageQueueManagerConfig) *StorageQueueManager {
constLabels := prometheus.Labels{
"type": tsdb.Name(),
}
return &StorageQueueManager{
tsdb: tsdb,
queue: make(chan *model.Sample, queueCapacity),
sendSemaphore: make(chan bool, maxConcurrentSends),
drained: make(chan bool),
if cfg == nil {
cfg = &defaultConfig
}
samplesCount: prometheus.NewCounterVec(
shards := make([]chan *model.Sample, cfg.Shards)
for i := 0; i < cfg.Shards; i++ {
shards[i] = make(chan *model.Sample, cfg.QueueCapacity)
}
t := &StorageQueueManager{
cfg: *cfg,
tsdb: tsdb,
shards: shards,
done: make(chan struct{}),
sentSamplesTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sent_samples_total",
Help: "Total number of processed samples to be sent to remote storage.",
Help: "Total number of processed samples sent to remote storage.",
ConstLabels: constLabels,
},
[]string{result},
),
sendLatency: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "send_latency_seconds",
Help: "Latency quantiles for sending sample batches to the remote storage.",
ConstLabels: constLabels,
}),
failedBatches: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "failed_batches_total",
Help: "Total number of sample batches that encountered an error while being sent to the remote storage.",
ConstLabels: constLabels,
}),
failedSamples: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "failed_samples_total",
Help: "Total number of samples that encountered an error while being sent to the remote storage.",
ConstLabels: constLabels,
}),
sentBatchDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sent_batch_duration_seconds",
Help: "Duration of sample batch send calls to the remote storage.",
ConstLabels: constLabels,
Buckets: prometheus.DefBuckets,
},
[]string{result},
),
queueLength: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -126,119 +128,131 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue
constLabels,
),
prometheus.GaugeValue,
float64(queueCapacity),
float64(cfg.QueueCapacity*cfg.Shards),
),
}
t.wg.Add(cfg.Shards)
return t
}
// Append queues a sample to be sent to the remote storage. It drops the
// sample on the floor if the queue is full.
// Always returns nil.
func (t *StorageQueueManager) Append(s *model.Sample) error {
fp := s.Metric.FastFingerprint()
shard := uint64(fp) % uint64(t.cfg.Shards)
select {
case t.queue <- s:
case t.shards[shard] <- s:
default:
t.samplesCount.WithLabelValues(dropped).Inc()
t.sentSamplesTotal.WithLabelValues(dropped).Inc()
log.Warn("Remote storage queue full, discarding sample.")
}
return nil
}
// NeedsThrottling implements storage.SampleAppender. It will always return
// false as a remote storage drops samples on the floor if backlogging instead
// of asking for throttling.
func (*StorageQueueManager) NeedsThrottling() bool {
return false
}
// Describe implements prometheus.Collector.
func (t *StorageQueueManager) Describe(ch chan<- *prometheus.Desc) {
t.sentSamplesTotal.Describe(ch)
t.sentBatchDuration.Describe(ch)
ch <- t.queueLength.Desc()
ch <- t.queueCapacity.Desc()
}
// QueueLength returns the number of outstanding samples in the queue.
func (t *StorageQueueManager) queueLen() int {
queueLength := 0
for _, shard := range t.shards {
queueLength += len(shard)
}
return queueLength
}
// Collect implements prometheus.Collector.
func (t *StorageQueueManager) Collect(ch chan<- prometheus.Metric) {
t.sentSamplesTotal.Collect(ch)
t.sentBatchDuration.Collect(ch)
t.queueLength.Set(float64(t.queueLen()))
ch <- t.queueLength
ch <- t.queueCapacity
}
// Run continuously sends samples to the remote storage.
func (t *StorageQueueManager) Run() {
for i := 0; i < t.cfg.Shards; i++ {
go t.runShard(i)
}
t.wg.Wait()
}
// Stop stops sending samples to the remote storage and waits for pending
// sends to complete.
func (t *StorageQueueManager) Stop() {
log.Infof("Stopping remote storage...")
close(t.queue)
<-t.drained
for i := 0; i < maxConcurrentSends; i++ {
t.sendSemaphore <- true
for _, shard := range t.shards {
close(shard)
}
t.wg.Wait()
log.Info("Remote storage stopped.")
}
// Describe implements prometheus.Collector.
func (t *StorageQueueManager) Describe(ch chan<- *prometheus.Desc) {
t.samplesCount.Describe(ch)
t.sendLatency.Describe(ch)
ch <- t.failedBatches.Desc()
ch <- t.failedSamples.Desc()
ch <- t.queueLength.Desc()
ch <- t.queueCapacity.Desc()
}
func (t *StorageQueueManager) runShard(i int) {
defer t.wg.Done()
shard := t.shards[i]
// Collect implements prometheus.Collector.
func (t *StorageQueueManager) Collect(ch chan<- prometheus.Metric) {
t.samplesCount.Collect(ch)
t.sendLatency.Collect(ch)
t.queueLength.Set(float64(len(t.queue)))
ch <- t.failedBatches
ch <- t.failedSamples
ch <- t.queueLength
ch <- t.queueCapacity
}
func (t *StorageQueueManager) sendSamples(s model.Samples) {
t.sendSemaphore <- true
go func() {
defer func() {
<-t.sendSemaphore
}()
// Samples are sent to the remote storage on a best-effort basis. If a
// sample isn't sent correctly the first time, it's simply dropped on the
// floor.
begin := time.Now()
err := t.tsdb.Store(s)
duration := time.Since(begin).Seconds()
labelValue := success
if err != nil {
log.Warnf("error sending %d samples to remote storage: %s", len(s), err)
labelValue = failure
t.failedBatches.Inc()
t.failedSamples.Add(float64(len(s)))
}
t.samplesCount.WithLabelValues(labelValue).Add(float64(len(s)))
t.sendLatency.Observe(duration)
}()
}
// Run continuously sends samples to the remote storage.
func (t *StorageQueueManager) Run() {
defer func() {
close(t.drained)
}()
// Send batches of at most maxSamplesPerSend samples to the remote storage.
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
// If we have fewer samples than that, flush them out after a deadline
// anyways.
pendingSamples := model.Samples{}
for {
select {
case s, ok := <-t.queue:
case s, ok := <-shard:
if !ok {
log.Infof("Flushing %d samples to remote storage...", len(t.pendingSamples))
t.flush()
log.Infof("Done flushing.")
if len(pendingSamples) > 0 {
log.Infof("Flushing %d samples to remote storage...", len(pendingSamples))
t.sendSamples(pendingSamples)
log.Infof("Done flushing.")
}
return
}
t.pendingSamples = append(t.pendingSamples, s)
pendingSamples = append(pendingSamples, s)
for len(t.pendingSamples) >= maxSamplesPerSend {
t.sendSamples(t.pendingSamples[:maxSamplesPerSend])
t.pendingSamples = t.pendingSamples[maxSamplesPerSend:]
for len(pendingSamples) >= t.cfg.MaxSamplesPerSend {
t.sendSamples(pendingSamples[:t.cfg.MaxSamplesPerSend])
pendingSamples = pendingSamples[t.cfg.MaxSamplesPerSend:]
}
case <-time.After(t.cfg.BatchSendDeadline):
if len(pendingSamples) > 0 {
t.sendSamples(pendingSamples)
pendingSamples = pendingSamples[:0]
}
case <-time.After(batchSendDeadline):
t.flush()
}
}
}
// Flush flushes remaining queued samples.
func (t *StorageQueueManager) flush() {
if len(t.pendingSamples) > 0 {
t.sendSamples(t.pendingSamples)
func (t *StorageQueueManager) sendSamples(s model.Samples) {
// Samples are sent to the remote storage on a best-effort basis. If a
// sample isn't sent correctly the first time, it's simply dropped on the
// floor.
begin := time.Now()
err := t.tsdb.Store(s)
duration := time.Since(begin).Seconds()
labelValue := success
if err != nil {
log.Warnf("error sending %d samples to remote storage: %s", len(s), err)
labelValue = failure
}
t.pendingSamples = t.pendingSamples[:0]
t.sentSamplesTotal.WithLabelValues(labelValue).Add(float64(len(s)))
t.sentBatchDuration.WithLabelValues(labelValue).Observe(duration)
}

View File

@ -14,6 +14,7 @@
package remote
import (
"fmt"
"sync"
"sync/atomic"
"testing"
@ -23,28 +24,53 @@ import (
)
type TestStorageClient struct {
receivedSamples model.Samples
expectedSamples model.Samples
receivedSamples map[string]model.Samples
expectedSamples map[string]model.Samples
wg sync.WaitGroup
mtx sync.Mutex
}
func (c *TestStorageClient) expectSamples(s model.Samples) {
c.expectedSamples = append(c.expectedSamples, s...)
c.wg.Add(len(s))
func NewTestStorageClient() *TestStorageClient {
return &TestStorageClient{
receivedSamples: map[string]model.Samples{},
expectedSamples: map[string]model.Samples{},
}
}
func (c *TestStorageClient) expectSamples(ss model.Samples) {
c.mtx.Lock()
defer c.mtx.Unlock()
for _, s := range ss {
ts := s.Metric.String()
c.expectedSamples[ts] = append(c.expectedSamples[ts], s)
}
c.wg.Add(len(ss))
}
func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) {
c.wg.Wait()
for i, expected := range c.expectedSamples {
if !expected.Equal(c.receivedSamples[i]) {
t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[i])
c.mtx.Lock()
defer c.mtx.Unlock()
for ts, expectedSamples := range c.expectedSamples {
for i, expected := range expectedSamples {
if !expected.Equal(c.receivedSamples[ts][i]) {
t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[ts][i])
}
}
}
}
func (c *TestStorageClient) Store(s model.Samples) error {
c.receivedSamples = append(c.receivedSamples, s...)
c.wg.Add(-len(s))
func (c *TestStorageClient) Store(ss model.Samples) error {
c.mtx.Lock()
defer c.mtx.Unlock()
for _, s := range ss {
ts := s.Metric.String()
c.receivedSamples[ts] = append(c.receivedSamples[ts], s)
}
c.wg.Add(-len(ss))
return nil
}
@ -55,21 +81,24 @@ func (c *TestStorageClient) Name() string {
func TestSampleDelivery(t *testing.T) {
// Let's create an even number of send batches so we don't run into the
// batch timeout case.
n := maxSamplesPerSend * 2
cfg := defaultConfig
n := cfg.QueueCapacity * 2
cfg.Shards = 1
samples := make(model.Samples, 0, n)
for i := 0; i < n; i++ {
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i))
samples = append(samples, &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: "test_metric",
model.MetricNameLabel: name,
},
Value: model.SampleValue(i),
})
}
c := &TestStorageClient{}
c := NewTestStorageClient()
c.expectSamples(samples[:len(samples)/2])
m := NewStorageQueueManager(c, len(samples)/2)
m := NewStorageQueueManager(c, &cfg)
// These should be received by the client.
for _, s := range samples[:len(samples)/2] {
@ -85,6 +114,39 @@ func TestSampleDelivery(t *testing.T) {
c.waitForExpectedSamples(t)
}
func TestSampleDeliveryOrder(t *testing.T) {
cfg := defaultConfig
ts := 10
n := cfg.MaxSamplesPerSend * ts
// Ensure we don't drop samples in this test.
cfg.QueueCapacity = n
samples := make(model.Samples, 0, n)
for i := 0; i < n; i++ {
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i%ts))
samples = append(samples, &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: name,
},
Value: model.SampleValue(i),
Timestamp: model.Time(i),
})
}
c := NewTestStorageClient()
c.expectSamples(samples)
m := NewStorageQueueManager(c, &cfg)
// These should be received by the client.
for _, s := range samples {
m.Append(s)
}
go m.Run()
defer m.Stop()
c.waitForExpectedSamples(t)
}
// TestBlockingStorageClient is a queue_manager StorageClient which will block
// on any calls to Store(), until the `block` channel is closed, at which point
// the `numCalls` property will contain a count of how many times Store() was
@ -121,24 +183,26 @@ func (c *TestBlockingStorageClient) Name() string {
func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
// Our goal is to fully empty the queue:
// `maxSamplesPerSend*maxConcurrentSends` samples should be consumed by the
// semaphore-controlled goroutines, and then another `maxSamplesPerSend`
// should be consumed by the Run() loop calling sendSample and immediately
// blocking.
n := maxSamplesPerSend*maxConcurrentSends + maxSamplesPerSend
// `MaxSamplesPerSend*Shards` samples should be consumed by the
// per-shard goroutines, and then another `MaxSamplesPerSend`
// should be left on the queue.
cfg := defaultConfig
n := cfg.MaxSamplesPerSend*cfg.Shards + cfg.MaxSamplesPerSend
cfg.QueueCapacity = n
samples := make(model.Samples, 0, n)
for i := 0; i < n; i++ {
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i))
samples = append(samples, &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: "test_metric",
model.MetricNameLabel: name,
},
Value: model.SampleValue(i),
})
}
c := NewTestBlockedStorageClient()
m := NewStorageQueueManager(c, n)
m := NewStorageQueueManager(c, &cfg)
go m.Run()
@ -151,7 +215,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
m.Append(s)
}
// Wait until the Run() loop drains the queue. If things went right, it
// Wait until the runShard() loops drain the queue. If things went right, it
// should then immediately block in sendSamples(), but, in case of error,
// it would spawn too many goroutines, and thus we'd see more calls to
// client.Store()
@ -163,19 +227,18 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
// draining the queue. We cap the waiting at 1 second -- that should give
// plenty of time, and keeps the failure fairly quick if we're not draining
// the queue properly.
for i := 0; i < 100 && len(m.queue) > 0; i++ {
for i := 0; i < 100 && m.queueLen() > 0; i++ {
time.Sleep(10 * time.Millisecond)
}
if len(m.queue) > 0 {
if m.queueLen() != cfg.MaxSamplesPerSend {
t.Fatalf("Failed to drain StorageQueueManager queue, %d elements left",
len(m.queue),
m.queueLen(),
)
}
numCalls := c.NumCalls()
if numCalls != maxConcurrentSends {
t.Errorf("Saw %d concurrent sends, expected %d", numCalls, maxConcurrentSends)
if numCalls != uint64(cfg.Shards) {
t.Errorf("Saw %d concurrent sends, expected %d", numCalls, cfg.Shards)
}
}

View File

@ -52,11 +52,11 @@ func New(o *Options) (*Storage, error) {
c := graphite.NewClient(
o.GraphiteAddress, o.GraphiteTransport,
o.StorageTimeout, o.GraphitePrefix)
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
s.queues = append(s.queues, NewStorageQueueManager(c, nil))
}
if o.OpentsdbURL != "" {
c := opentsdb.NewClient(o.OpentsdbURL, o.StorageTimeout)
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
s.queues = append(s.queues, NewStorageQueueManager(c, nil))
}
if o.InfluxdbURL != nil {
conf := influx.Config{
@ -67,14 +67,14 @@ func New(o *Options) (*Storage, error) {
}
c := influxdb.NewClient(conf, o.InfluxdbDatabase, o.InfluxdbRetentionPolicy)
prometheus.MustRegister(c)
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
s.queues = append(s.queues, NewStorageQueueManager(c, nil))
}
if o.Address != "" {
c, err := NewClient(o.Address, o.StorageTimeout)
if err != nil {
return nil, err
}
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
s.queues = append(s.queues, NewStorageQueueManager(c, nil))
}
if len(s.queues) == 0 {
return nil, nil