diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index d89e9406d..79ee8cf9e 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -14,6 +14,7 @@ package remote import ( + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -21,16 +22,6 @@ import ( "github.com/prometheus/common/model" ) -const ( - // The maximum number of concurrent send requests to the remote storage. - maxConcurrentSends = 10 - // The maximum number of samples to fit into a single request to the remote storage. - maxSamplesPerSend = 100 - // The deadline after which to send queued samples even if the maximum batch - // size has not been reached. - batchSendDeadline = 5 * time.Second -) - // String constants for instrumentation. const ( namespace = "prometheus" @@ -51,66 +42,77 @@ type StorageClient interface { Name() string } +type StorageQueueManagerConfig struct { + QueueCapacity int // Number of samples to buffer per shard before we start dropping them. + Shards int // Number of shards, i.e. amount of concurrency. + MaxSamplesPerSend int // Maximum number of samples per send. + BatchSendDeadline time.Duration // Maximum time sample will wait in buffer. +} + +var defaultConfig = StorageQueueManagerConfig{ + QueueCapacity: 100 * 1024 / 10, + Shards: 10, + MaxSamplesPerSend: 100, + BatchSendDeadline: 5 * time.Second, +} + // StorageQueueManager manages a queue of samples to be sent to the Storage // indicated by the provided StorageClient. type StorageQueueManager struct { - tsdb StorageClient - queue chan *model.Sample - pendingSamples model.Samples - sendSemaphore chan bool - drained chan bool + cfg StorageQueueManagerConfig + tsdb StorageClient + shards []chan *model.Sample + wg sync.WaitGroup + done chan struct{} - samplesCount *prometheus.CounterVec - sendLatency prometheus.Summary - failedBatches prometheus.Counter - failedSamples prometheus.Counter - queueLength prometheus.Gauge - queueCapacity prometheus.Metric + sentSamplesTotal *prometheus.CounterVec + sentBatchDuration *prometheus.HistogramVec + queueLength prometheus.Gauge + queueCapacity prometheus.Metric } // NewStorageQueueManager builds a new StorageQueueManager. -func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueueManager { +func NewStorageQueueManager(tsdb StorageClient, cfg *StorageQueueManagerConfig) *StorageQueueManager { constLabels := prometheus.Labels{ "type": tsdb.Name(), } - return &StorageQueueManager{ - tsdb: tsdb, - queue: make(chan *model.Sample, queueCapacity), - sendSemaphore: make(chan bool, maxConcurrentSends), - drained: make(chan bool), + if cfg == nil { + cfg = &defaultConfig + } - samplesCount: prometheus.NewCounterVec( + shards := make([]chan *model.Sample, cfg.Shards) + for i := 0; i < cfg.Shards; i++ { + shards[i] = make(chan *model.Sample, cfg.QueueCapacity) + } + + t := &StorageQueueManager{ + cfg: *cfg, + tsdb: tsdb, + shards: shards, + done: make(chan struct{}), + + sentSamplesTotal: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "sent_samples_total", - Help: "Total number of processed samples to be sent to remote storage.", + Help: "Total number of processed samples sent to remote storage.", ConstLabels: constLabels, }, []string{result}, ), - sendLatency: prometheus.NewSummary(prometheus.SummaryOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "send_latency_seconds", - Help: "Latency quantiles for sending sample batches to the remote storage.", - ConstLabels: constLabels, - }), - failedBatches: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "failed_batches_total", - Help: "Total number of sample batches that encountered an error while being sent to the remote storage.", - ConstLabels: constLabels, - }), - failedSamples: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "failed_samples_total", - Help: "Total number of samples that encountered an error while being sent to the remote storage.", - ConstLabels: constLabels, - }), + sentBatchDuration: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sent_batch_duration_seconds", + Help: "Duration of sample batch send calls to the remote storage.", + ConstLabels: constLabels, + Buckets: prometheus.DefBuckets, + }, + []string{result}, + ), queueLength: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, @@ -126,119 +128,131 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue constLabels, ), prometheus.GaugeValue, - float64(queueCapacity), + float64(cfg.QueueCapacity*cfg.Shards), ), } + + t.wg.Add(cfg.Shards) + return t } // Append queues a sample to be sent to the remote storage. It drops the // sample on the floor if the queue is full. // Always returns nil. func (t *StorageQueueManager) Append(s *model.Sample) error { + fp := s.Metric.FastFingerprint() + shard := uint64(fp) % uint64(t.cfg.Shards) + select { - case t.queue <- s: + case t.shards[shard] <- s: default: - t.samplesCount.WithLabelValues(dropped).Inc() + t.sentSamplesTotal.WithLabelValues(dropped).Inc() log.Warn("Remote storage queue full, discarding sample.") } return nil } +// NeedsThrottling implements storage.SampleAppender. It will always return +// false as a remote storage drops samples on the floor if backlogging instead +// of asking for throttling. +func (*StorageQueueManager) NeedsThrottling() bool { + return false +} + +// Describe implements prometheus.Collector. +func (t *StorageQueueManager) Describe(ch chan<- *prometheus.Desc) { + t.sentSamplesTotal.Describe(ch) + t.sentBatchDuration.Describe(ch) + ch <- t.queueLength.Desc() + ch <- t.queueCapacity.Desc() +} + +// QueueLength returns the number of outstanding samples in the queue. +func (t *StorageQueueManager) queueLen() int { + queueLength := 0 + for _, shard := range t.shards { + queueLength += len(shard) + } + return queueLength +} + +// Collect implements prometheus.Collector. +func (t *StorageQueueManager) Collect(ch chan<- prometheus.Metric) { + t.sentSamplesTotal.Collect(ch) + t.sentBatchDuration.Collect(ch) + t.queueLength.Set(float64(t.queueLen())) + ch <- t.queueLength + ch <- t.queueCapacity +} + +// Run continuously sends samples to the remote storage. +func (t *StorageQueueManager) Run() { + for i := 0; i < t.cfg.Shards; i++ { + go t.runShard(i) + } + t.wg.Wait() +} + // Stop stops sending samples to the remote storage and waits for pending // sends to complete. func (t *StorageQueueManager) Stop() { log.Infof("Stopping remote storage...") - close(t.queue) - <-t.drained - for i := 0; i < maxConcurrentSends; i++ { - t.sendSemaphore <- true + for _, shard := range t.shards { + close(shard) } + t.wg.Wait() log.Info("Remote storage stopped.") } -// Describe implements prometheus.Collector. -func (t *StorageQueueManager) Describe(ch chan<- *prometheus.Desc) { - t.samplesCount.Describe(ch) - t.sendLatency.Describe(ch) - ch <- t.failedBatches.Desc() - ch <- t.failedSamples.Desc() - ch <- t.queueLength.Desc() - ch <- t.queueCapacity.Desc() -} +func (t *StorageQueueManager) runShard(i int) { + defer t.wg.Done() + shard := t.shards[i] -// Collect implements prometheus.Collector. -func (t *StorageQueueManager) Collect(ch chan<- prometheus.Metric) { - t.samplesCount.Collect(ch) - t.sendLatency.Collect(ch) - t.queueLength.Set(float64(len(t.queue))) - ch <- t.failedBatches - ch <- t.failedSamples - ch <- t.queueLength - ch <- t.queueCapacity -} - -func (t *StorageQueueManager) sendSamples(s model.Samples) { - t.sendSemaphore <- true - - go func() { - defer func() { - <-t.sendSemaphore - }() - - // Samples are sent to the remote storage on a best-effort basis. If a - // sample isn't sent correctly the first time, it's simply dropped on the - // floor. - begin := time.Now() - err := t.tsdb.Store(s) - duration := time.Since(begin).Seconds() - - labelValue := success - if err != nil { - log.Warnf("error sending %d samples to remote storage: %s", len(s), err) - labelValue = failure - t.failedBatches.Inc() - t.failedSamples.Add(float64(len(s))) - } - t.samplesCount.WithLabelValues(labelValue).Add(float64(len(s))) - t.sendLatency.Observe(duration) - }() -} - -// Run continuously sends samples to the remote storage. -func (t *StorageQueueManager) Run() { - defer func() { - close(t.drained) - }() - - // Send batches of at most maxSamplesPerSend samples to the remote storage. + // Send batches of at most MaxSamplesPerSend samples to the remote storage. // If we have fewer samples than that, flush them out after a deadline // anyways. + pendingSamples := model.Samples{} + for { select { - case s, ok := <-t.queue: + case s, ok := <-shard: if !ok { - log.Infof("Flushing %d samples to remote storage...", len(t.pendingSamples)) - t.flush() - log.Infof("Done flushing.") + if len(pendingSamples) > 0 { + log.Infof("Flushing %d samples to remote storage...", len(pendingSamples)) + t.sendSamples(pendingSamples) + log.Infof("Done flushing.") + } return } - t.pendingSamples = append(t.pendingSamples, s) + pendingSamples = append(pendingSamples, s) - for len(t.pendingSamples) >= maxSamplesPerSend { - t.sendSamples(t.pendingSamples[:maxSamplesPerSend]) - t.pendingSamples = t.pendingSamples[maxSamplesPerSend:] + for len(pendingSamples) >= t.cfg.MaxSamplesPerSend { + t.sendSamples(pendingSamples[:t.cfg.MaxSamplesPerSend]) + pendingSamples = pendingSamples[t.cfg.MaxSamplesPerSend:] + } + case <-time.After(t.cfg.BatchSendDeadline): + if len(pendingSamples) > 0 { + t.sendSamples(pendingSamples) + pendingSamples = pendingSamples[:0] } - case <-time.After(batchSendDeadline): - t.flush() } } } -// Flush flushes remaining queued samples. -func (t *StorageQueueManager) flush() { - if len(t.pendingSamples) > 0 { - t.sendSamples(t.pendingSamples) +func (t *StorageQueueManager) sendSamples(s model.Samples) { + // Samples are sent to the remote storage on a best-effort basis. If a + // sample isn't sent correctly the first time, it's simply dropped on the + // floor. + begin := time.Now() + err := t.tsdb.Store(s) + duration := time.Since(begin).Seconds() + + labelValue := success + if err != nil { + log.Warnf("error sending %d samples to remote storage: %s", len(s), err) + labelValue = failure } - t.pendingSamples = t.pendingSamples[:0] + t.sentSamplesTotal.WithLabelValues(labelValue).Add(float64(len(s))) + t.sentBatchDuration.WithLabelValues(labelValue).Observe(duration) } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index afe23178f..f3bd85aa3 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -14,6 +14,7 @@ package remote import ( + "fmt" "sync" "sync/atomic" "testing" @@ -23,28 +24,53 @@ import ( ) type TestStorageClient struct { - receivedSamples model.Samples - expectedSamples model.Samples + receivedSamples map[string]model.Samples + expectedSamples map[string]model.Samples wg sync.WaitGroup + mtx sync.Mutex } -func (c *TestStorageClient) expectSamples(s model.Samples) { - c.expectedSamples = append(c.expectedSamples, s...) - c.wg.Add(len(s)) +func NewTestStorageClient() *TestStorageClient { + return &TestStorageClient{ + receivedSamples: map[string]model.Samples{}, + expectedSamples: map[string]model.Samples{}, + } +} + +func (c *TestStorageClient) expectSamples(ss model.Samples) { + c.mtx.Lock() + defer c.mtx.Unlock() + + for _, s := range ss { + ts := s.Metric.String() + c.expectedSamples[ts] = append(c.expectedSamples[ts], s) + } + c.wg.Add(len(ss)) } func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) { c.wg.Wait() - for i, expected := range c.expectedSamples { - if !expected.Equal(c.receivedSamples[i]) { - t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[i]) + + c.mtx.Lock() + defer c.mtx.Unlock() + for ts, expectedSamples := range c.expectedSamples { + for i, expected := range expectedSamples { + if !expected.Equal(c.receivedSamples[ts][i]) { + t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[ts][i]) + } } } } -func (c *TestStorageClient) Store(s model.Samples) error { - c.receivedSamples = append(c.receivedSamples, s...) - c.wg.Add(-len(s)) +func (c *TestStorageClient) Store(ss model.Samples) error { + c.mtx.Lock() + defer c.mtx.Unlock() + + for _, s := range ss { + ts := s.Metric.String() + c.receivedSamples[ts] = append(c.receivedSamples[ts], s) + } + c.wg.Add(-len(ss)) return nil } @@ -55,21 +81,24 @@ func (c *TestStorageClient) Name() string { func TestSampleDelivery(t *testing.T) { // Let's create an even number of send batches so we don't run into the // batch timeout case. - n := maxSamplesPerSend * 2 + cfg := defaultConfig + n := cfg.QueueCapacity * 2 + cfg.Shards = 1 samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) samples = append(samples, &model.Sample{ Metric: model.Metric{ - model.MetricNameLabel: "test_metric", + model.MetricNameLabel: name, }, Value: model.SampleValue(i), }) } - c := &TestStorageClient{} + c := NewTestStorageClient() c.expectSamples(samples[:len(samples)/2]) - m := NewStorageQueueManager(c, len(samples)/2) + m := NewStorageQueueManager(c, &cfg) // These should be received by the client. for _, s := range samples[:len(samples)/2] { @@ -85,6 +114,39 @@ func TestSampleDelivery(t *testing.T) { c.waitForExpectedSamples(t) } +func TestSampleDeliveryOrder(t *testing.T) { + cfg := defaultConfig + ts := 10 + n := cfg.MaxSamplesPerSend * ts + // Ensure we don't drop samples in this test. + cfg.QueueCapacity = n + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i%ts)) + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: name, + }, + Value: model.SampleValue(i), + Timestamp: model.Time(i), + }) + } + + c := NewTestStorageClient() + c.expectSamples(samples) + m := NewStorageQueueManager(c, &cfg) + + // These should be received by the client. + for _, s := range samples { + m.Append(s) + } + go m.Run() + defer m.Stop() + + c.waitForExpectedSamples(t) +} + // TestBlockingStorageClient is a queue_manager StorageClient which will block // on any calls to Store(), until the `block` channel is closed, at which point // the `numCalls` property will contain a count of how many times Store() was @@ -121,24 +183,26 @@ func (c *TestBlockingStorageClient) Name() string { func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { // Our goal is to fully empty the queue: - // `maxSamplesPerSend*maxConcurrentSends` samples should be consumed by the - // semaphore-controlled goroutines, and then another `maxSamplesPerSend` - // should be consumed by the Run() loop calling sendSample and immediately - // blocking. - n := maxSamplesPerSend*maxConcurrentSends + maxSamplesPerSend + // `MaxSamplesPerSend*Shards` samples should be consumed by the + // per-shard goroutines, and then another `MaxSamplesPerSend` + // should be left on the queue. + cfg := defaultConfig + n := cfg.MaxSamplesPerSend*cfg.Shards + cfg.MaxSamplesPerSend + cfg.QueueCapacity = n samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) samples = append(samples, &model.Sample{ Metric: model.Metric{ - model.MetricNameLabel: "test_metric", + model.MetricNameLabel: name, }, Value: model.SampleValue(i), }) } c := NewTestBlockedStorageClient() - m := NewStorageQueueManager(c, n) + m := NewStorageQueueManager(c, &cfg) go m.Run() @@ -151,7 +215,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { m.Append(s) } - // Wait until the Run() loop drains the queue. If things went right, it + // Wait until the runShard() loops drain the queue. If things went right, it // should then immediately block in sendSamples(), but, in case of error, // it would spawn too many goroutines, and thus we'd see more calls to // client.Store() @@ -163,19 +227,18 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { // draining the queue. We cap the waiting at 1 second -- that should give // plenty of time, and keeps the failure fairly quick if we're not draining // the queue properly. - for i := 0; i < 100 && len(m.queue) > 0; i++ { + for i := 0; i < 100 && m.queueLen() > 0; i++ { time.Sleep(10 * time.Millisecond) } - if len(m.queue) > 0 { + if m.queueLen() != cfg.MaxSamplesPerSend { t.Fatalf("Failed to drain StorageQueueManager queue, %d elements left", - len(m.queue), + m.queueLen(), ) } numCalls := c.NumCalls() - if numCalls != maxConcurrentSends { - t.Errorf("Saw %d concurrent sends, expected %d", numCalls, maxConcurrentSends) + if numCalls != uint64(cfg.Shards) { + t.Errorf("Saw %d concurrent sends, expected %d", numCalls, cfg.Shards) } - } diff --git a/storage/remote/remote.go b/storage/remote/remote.go index 2c1923b79..91c25c886 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -52,11 +52,11 @@ func New(o *Options) (*Storage, error) { c := graphite.NewClient( o.GraphiteAddress, o.GraphiteTransport, o.StorageTimeout, o.GraphitePrefix) - s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) + s.queues = append(s.queues, NewStorageQueueManager(c, nil)) } if o.OpentsdbURL != "" { c := opentsdb.NewClient(o.OpentsdbURL, o.StorageTimeout) - s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) + s.queues = append(s.queues, NewStorageQueueManager(c, nil)) } if o.InfluxdbURL != nil { conf := influx.Config{ @@ -67,14 +67,14 @@ func New(o *Options) (*Storage, error) { } c := influxdb.NewClient(conf, o.InfluxdbDatabase, o.InfluxdbRetentionPolicy) prometheus.MustRegister(c) - s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) + s.queues = append(s.queues, NewStorageQueueManager(c, nil)) } if o.Address != "" { c, err := NewClient(o.Address, o.StorageTimeout) if err != nil { return nil, err } - s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) + s.queues = append(s.queues, NewStorageQueueManager(c, nil)) } if len(s.queues) == 0 { return nil, nil