diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 22da78dca..17ad7525e 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -522,8 +522,11 @@ outer: continue } t.seriesMtx.Unlock() - // This will only loop if the queues are being resharded. - backoff := t.cfg.MinBackoff + // Start with a very small backoff. This should not be t.cfg.MinBackoff + // as it can happen without errors, and we want to pickup work after + // filling a queue/resharding as quickly as possible. + // TODO: Consider using the average duration of a request as the backoff. + backoff := model.Duration(5 * time.Millisecond) for { select { case <-t.quit: @@ -542,6 +545,8 @@ outer: t.metrics.enqueueRetriesTotal.Inc() time.Sleep(time.Duration(backoff)) backoff = backoff * 2 + // It is reasonable to use t.cfg.MaxBackoff here, as if we have hit + // the full backoff we are likely waiting for external resources. if backoff > t.cfg.MaxBackoff { backoff = t.cfg.MaxBackoff } @@ -906,8 +911,7 @@ func (t *QueueManager) newShards() *shards { } type shards struct { - mtx sync.RWMutex // With the WAL, this is never actually contended. - writeMtx sync.Mutex + mtx sync.RWMutex // With the WAL, this is never actually contended. qm *QueueManager queues []*queue @@ -994,26 +998,21 @@ func (s *shards) stop() { } } -// enqueue data (sample or exemplar). If we are currently in the process of shutting down or resharding, -// will return false; in this case, you should back off and retry. +// enqueue data (sample or exemplar). If the shard is full, shutting down, or +// resharding, it will return false; in this case, you should back off and +// retry. A shard is full when its configured capacity has been reached, +// specifically, when s.queues[shard] has filled its batchQueue channel and the +// partial batch has also been filled. func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool { s.mtx.RLock() defer s.mtx.RUnlock() - s.writeMtx.Lock() - defer s.writeMtx.Unlock() - - select { - case <-s.softShutdown: - return false - default: - } shard := uint64(ref) % uint64(len(s.queues)) select { case <-s.softShutdown: return false default: - appended := s.queues[shard].Append(data, s.softShutdown) + appended := s.queues[shard].Append(data) if !appended { return false } @@ -1029,9 +1028,7 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data sampleOrExemplar) bool { } type queue struct { - // batchMtx covers sending to the batchQueue and batch operations other - // than appending a sample. It is mainly to make sure (*queue).Batch() and - // (*queue).FlushAndShutdown() are not called concurrently. + // batchMtx covers operations appending to or publishing the partial batch. batchMtx sync.Mutex batch []sampleOrExemplar batchQueue chan []sampleOrExemplar @@ -1053,6 +1050,11 @@ type sampleOrExemplar struct { func newQueue(batchSize, capacity int) *queue { batches := capacity / batchSize + // Always create an unbuffered channel even if capacity is configured to be + // less than max_samples_per_send. + if batches == 0 { + batches = 1 + } return &queue{ batch: make([]sampleOrExemplar, 0, batchSize), batchQueue: make(chan []sampleOrExemplar, batches), @@ -1062,14 +1064,18 @@ func newQueue(batchSize, capacity int) *queue { } } -func (q *queue) Append(datum sampleOrExemplar, stop <-chan struct{}) bool { +// Append the sampleOrExemplar to the buffered batch. Returns false if it +// cannot be added and must be retried. +func (q *queue) Append(datum sampleOrExemplar) bool { + q.batchMtx.Lock() + defer q.batchMtx.Unlock() q.batch = append(q.batch, datum) if len(q.batch) == cap(q.batch) { select { case q.batchQueue <- q.batch: q.batch = q.newBatch(cap(q.batch)) return true - case <-stop: + default: // Remove the sample we just appended. It will get retried. q.batch = q.batch[:len(q.batch)-1] return false @@ -1082,15 +1088,19 @@ func (q *queue) Chan() <-chan []sampleOrExemplar { return q.batchQueue } -// Batch returns the current batch and allocates a new batch. Must not be -// called concurrently with Append. +// Batch returns the current batch and allocates a new batch. func (q *queue) Batch() []sampleOrExemplar { q.batchMtx.Lock() defer q.batchMtx.Unlock() - batch := q.batch - q.batch = q.newBatch(cap(batch)) - return batch + select { + case batch := <-q.batchQueue: + return batch + default: + batch := q.batch + q.batch = q.newBatch(cap(batch)) + return batch + } } // ReturnForReuse adds the batch buffer back to the internal pool. @@ -1201,22 +1211,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) case <-timer.C: - // We need to take the write lock when getting a batch to avoid - // concurrent Appends. Generally this will only happen on low - // traffic instances or during resharding. We have to use writeMtx - // and not the batchMtx on a queue because we do not want to have - // to lock each queue for each sample, and cannot call - // queue.Batch() while an Append happens. - s.writeMtx.Lock() - // First, we need to see if we can happen to get a batch from the - // queue if it filled while acquiring the lock. - var batch []sampleOrExemplar - select { - case batch = <-batchQueue: - default: - batch = queue.Batch() - } - s.writeMtx.Unlock() + batch := queue.Batch() if len(batch) > 0 { nPendingSamples, nPendingExemplars := s.populateTimeSeries(batch, pendingData) n := nPendingSamples + nPendingExemplars diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 14fbb324d..15279b074 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -20,6 +20,7 @@ import ( "math" "net/url" "os" + "runtime/pprof" "sort" "strconv" "strings" @@ -413,6 +414,7 @@ func TestReshardPartialBatch(t *testing.T) { case <-done: case <-time.After(2 * time.Second): t.Error("Deadlock between sending and stopping detected") + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) t.FailNow() } } @@ -420,6 +422,47 @@ func TestReshardPartialBatch(t *testing.T) { m.Stop() } +// TestQueueFilledDeadlock makes sure the code does not deadlock in the case +// where a large scrape (> capacity + max samples per send) is appended at the +// same time as a batch times out according to the batch send deadline. +func TestQueueFilledDeadlock(t *testing.T) { + samples, series := createTimeseries(50, 1) + + c := NewNopWriteClient() + + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig + cfg.MaxShards = 1 + cfg.MaxSamplesPerSend = 10 + cfg.Capacity = 20 + flushDeadline := time.Second + batchSendDeadline := time.Millisecond + cfg.BatchSendDeadline = model.Duration(batchSendDeadline) + + metrics := newQueueManagerMetrics(nil, "", "") + + m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false) + m.StoreSeries(series, 0) + m.Start() + defer m.Stop() + + for i := 0; i < 100; i++ { + done := make(chan struct{}) + go func() { + time.Sleep(batchSendDeadline) + m.Append(samples) + done <- struct{}{} + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Error("Deadlock between sending and appending detected") + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + t.FailNow() + } + } +} + func TestReleaseNoninternedString(t *testing.T) { cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig