Browse Source

Write chunks via queue, predicting the refs (#10051)

* Write chunks via queue, predicting the refs

Our load tests have shown that there is a latency spike in the
remote write handler whenever the head chunks need to be written,
because chunkDiskMapper.WriteChunk() blocks until the chunks are written
to disk.

This adds a queue to the chunk disk mapper which makes the WriteChunk()
method non-blocking unless the queue is full. Reads can still be served
from the queue.

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* address PR feeddback

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* initialize metrics without .Add(0)

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* change isRunningMtx to normal lock

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* do not re-initialize chunkrefmap

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* update metric outside of lock scope

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* add benchmark for adding job to chunk write queue

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* remove unnecessary "success" var

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* gofumpt -extra

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* avoid WithLabelValues call in addJob

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* format comments

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* addressing PR feedback

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* rename cutExpectRef to cutAndExpectRef

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* use head.Init() instead of .initTime()

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* address PR feedback

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* PR feedback

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* update test according to PR feedback

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* replace callbackWg -> awaitCb

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* better test of truncation with empty files

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* replace callbackWg -> awaitCb

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
pull/10144/head
Mauro Stettler 3 years ago committed by GitHub
parent
commit
0df3489275
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 165
      tsdb/chunks/chunk_write_queue.go
  2. 287
      tsdb/chunks/chunk_write_queue_test.go
  3. 223
      tsdb/chunks/head_chunks.go
  4. 151
      tsdb/chunks/head_chunks_test.go
  5. 8
      tsdb/db.go
  6. 10
      tsdb/db_test.go
  7. 5
      tsdb/head.go
  8. 13
      tsdb/head_append.go
  9. 206
      tsdb/head_test.go

165
tsdb/chunks/chunk_write_queue.go

@ -0,0 +1,165 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunks
import (
"errors"
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
type chunkWriteJob struct {
cutFile bool
seriesRef HeadSeriesRef
mint int64
maxt int64
chk chunkenc.Chunk
ref ChunkDiskMapperRef
callback func(error)
}
// chunkWriteQueue is a queue for writing chunks to disk in a non-blocking fashion.
// Chunks that shall be written get added to the queue, which is consumed asynchronously.
// Adding jobs to the job is non-blocking as long as the queue isn't full.
type chunkWriteQueue struct {
jobs chan chunkWriteJob
chunkRefMapMtx sync.RWMutex
chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk
isRunningMtx sync.Mutex // Protects the isRunning property.
isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed.
workerWg sync.WaitGroup
writeChunk writeChunkF
// Keeping three separate counters instead of only a single CounterVec to improve the performance of the critical
// addJob() method which otherwise would need to perform a WithLabelValues call on the CounterVec.
adds prometheus.Counter
gets prometheus.Counter
completed prometheus.Counter
}
// writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests.
type writeChunkF func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool) error
func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChunkF) *chunkWriteQueue {
counters := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "prometheus_tsdb_chunk_write_queue_operations_total",
Help: "Number of operations on the chunk_write_queue.",
},
[]string{"operation"},
)
q := &chunkWriteQueue{
jobs: make(chan chunkWriteJob, size),
chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk, size),
writeChunk: writeChunk,
adds: counters.WithLabelValues("add"),
gets: counters.WithLabelValues("get"),
completed: counters.WithLabelValues("complete"),
}
if reg != nil {
reg.MustRegister(counters)
}
q.start()
return q
}
func (c *chunkWriteQueue) start() {
c.workerWg.Add(1)
go func() {
defer c.workerWg.Done()
for job := range c.jobs {
c.processJob(job)
}
}()
c.isRunningMtx.Lock()
c.isRunning = true
c.isRunningMtx.Unlock()
}
func (c *chunkWriteQueue) processJob(job chunkWriteJob) {
err := c.writeChunk(job.seriesRef, job.mint, job.maxt, job.chk, job.ref, job.cutFile)
if job.callback != nil {
job.callback(err)
}
c.chunkRefMapMtx.Lock()
defer c.chunkRefMapMtx.Unlock()
delete(c.chunkRefMap, job.ref)
c.completed.Inc()
}
func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) {
defer func() {
if err == nil {
c.adds.Inc()
}
}()
c.isRunningMtx.Lock()
defer c.isRunningMtx.Unlock()
if !c.isRunning {
return errors.New("queue is not started")
}
c.chunkRefMapMtx.Lock()
c.chunkRefMap[job.ref] = job.chk
c.chunkRefMapMtx.Unlock()
c.jobs <- job
return nil
}
func (c *chunkWriteQueue) get(ref ChunkDiskMapperRef) chunkenc.Chunk {
c.chunkRefMapMtx.RLock()
defer c.chunkRefMapMtx.RUnlock()
chk, ok := c.chunkRefMap[ref]
if ok {
c.gets.Inc()
}
return chk
}
func (c *chunkWriteQueue) stop() {
c.isRunningMtx.Lock()
defer c.isRunningMtx.Unlock()
if !c.isRunning {
return
}
c.isRunning = false
close(c.jobs)
c.workerWg.Wait()
}

287
tsdb/chunks/chunk_write_queue_test.go

@ -0,0 +1,287 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunks
import (
"errors"
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
func TestChunkWriteQueue_GettingChunkFromQueue(t *testing.T) {
var blockWriterWg sync.WaitGroup
blockWriterWg.Add(1)
// blockingChunkWriter blocks until blockWriterWg is done.
blockingChunkWriter := func(_ HeadSeriesRef, _, _ int64, _ chunkenc.Chunk, _ ChunkDiskMapperRef, _ bool) error {
blockWriterWg.Wait()
return nil
}
q := newChunkWriteQueue(nil, 1000, blockingChunkWriter)
defer q.stop()
defer blockWriterWg.Done()
testChunk := chunkenc.NewXORChunk()
var ref ChunkDiskMapperRef
job := chunkWriteJob{
chk: testChunk,
ref: ref,
}
require.NoError(t, q.addJob(job))
// Retrieve chunk from the queue.
gotChunk := q.get(ref)
require.Equal(t, testChunk, gotChunk)
}
func TestChunkWriteQueue_WritingThroughQueue(t *testing.T) {
var (
gotSeriesRef HeadSeriesRef
gotMint, gotMaxt int64
gotChunk chunkenc.Chunk
gotRef ChunkDiskMapperRef
gotCutFile bool
)
blockingChunkWriter := func(seriesRef HeadSeriesRef, mint, maxt int64, chunk chunkenc.Chunk, ref ChunkDiskMapperRef, cutFile bool) error {
gotSeriesRef = seriesRef
gotMint = mint
gotMaxt = maxt
gotChunk = chunk
gotRef = ref
gotCutFile = cutFile
return nil
}
q := newChunkWriteQueue(nil, 1000, blockingChunkWriter)
defer q.stop()
seriesRef := HeadSeriesRef(1)
var mint, maxt int64 = 2, 3
chunk := chunkenc.NewXORChunk()
ref := newChunkDiskMapperRef(321, 123)
cutFile := true
awaitCb := make(chan struct{})
require.NoError(t, q.addJob(chunkWriteJob{seriesRef: seriesRef, mint: mint, maxt: maxt, chk: chunk, ref: ref, cutFile: cutFile, callback: func(err error) {
close(awaitCb)
}}))
<-awaitCb
// Compare whether the write function has received all job attributes correctly.
require.Equal(t, seriesRef, gotSeriesRef)
require.Equal(t, mint, gotMint)
require.Equal(t, maxt, gotMaxt)
require.Equal(t, chunk, gotChunk)
require.Equal(t, ref, gotRef)
require.Equal(t, cutFile, gotCutFile)
}
func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) {
sizeLimit := 100
unblockChunkWriterCh := make(chan struct{}, sizeLimit)
// blockingChunkWriter blocks until the unblockChunkWriterCh channel returns a value.
blockingChunkWriter := func(seriesRef HeadSeriesRef, mint, maxt int64, chunk chunkenc.Chunk, ref ChunkDiskMapperRef, cutFile bool) error {
<-unblockChunkWriterCh
return nil
}
q := newChunkWriteQueue(nil, sizeLimit, blockingChunkWriter)
defer q.stop()
// Unblock writers when shutting down.
defer close(unblockChunkWriterCh)
var chunkRef ChunkDiskMapperRef
var callbackWg sync.WaitGroup
addChunk := func() {
callbackWg.Add(1)
require.NoError(t, q.addJob(chunkWriteJob{
ref: chunkRef,
callback: func(err error) {
callbackWg.Done()
},
}))
chunkRef++
}
unblockChunkWriter := func() {
unblockChunkWriterCh <- struct{}{}
}
// Fill the queue to the middle of the size limit.
for job := 0; job < sizeLimit/2; job++ {
addChunk()
}
// Consume the jobs.
for job := 0; job < sizeLimit/2; job++ {
unblockChunkWriter()
}
// Add jobs until the queue is full.
// Note that one more job than <sizeLimit> can be added because one will be processed by the worker already
// and it will block on the chunk write function.
for job := 0; job < sizeLimit+1; job++ {
addChunk()
}
// The queue should be full.
require.True(t, queueIsFull(q))
// Adding another job should block as long as no job from the queue gets consumed.
addedJob := atomic.NewBool(false)
go func() {
addChunk()
addedJob.Store(true)
}()
// Wait for 10ms while the adding of a new job is blocked.
time.Sleep(time.Millisecond * 10)
require.False(t, addedJob.Load())
// Consume one job from the queue.
unblockChunkWriter()
// Wait until the job has been added to the queue.
require.Eventually(t, func() bool { return addedJob.Load() }, time.Second, time.Millisecond*10)
// The queue should be full again.
require.True(t, queueIsFull(q))
// Consume <sizeLimit>+1 jobs from the queue.
// To drain the queue we need to consume <sizeLimit>+1 jobs because 1 job
// is already in the state of being processed.
for job := 0; job < sizeLimit+1; job++ {
require.False(t, queueIsEmpty(q))
unblockChunkWriter()
}
// Wait until all jobs have been processed.
callbackWg.Wait()
require.True(t, queueIsEmpty(q))
}
func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) {
testError := errors.New("test error")
chunkWriter := func(_ HeadSeriesRef, _, _ int64, _ chunkenc.Chunk, _ ChunkDiskMapperRef, _ bool) error {
return testError
}
awaitCb := make(chan struct{})
var gotError error
callback := func(err error) {
gotError = err
close(awaitCb)
}
q := newChunkWriteQueue(nil, 1, chunkWriter)
defer q.stop()
job := chunkWriteJob{callback: callback}
require.NoError(t, q.addJob(job))
<-awaitCb
require.Equal(t, testError, gotError)
}
func BenchmarkChunkWriteQueue_addJob(b *testing.B) {
for _, withReads := range []bool{false, true} {
b.Run(fmt.Sprintf("with reads %t", withReads), func(b *testing.B) {
for _, concurrentWrites := range []int{1, 10, 100, 1000} {
b.Run(fmt.Sprintf("%d concurrent writes", concurrentWrites), func(b *testing.B) {
issueReadSignal := make(chan struct{})
q := newChunkWriteQueue(nil, 1000, func(ref HeadSeriesRef, i, i2 int64, chunk chunkenc.Chunk, ref2 ChunkDiskMapperRef, b bool) error {
if withReads {
select {
case issueReadSignal <- struct{}{}:
default:
// Can't write to issueReadSignal, don't block but omit read instead.
}
}
return nil
})
b.Cleanup(func() {
// Stopped already, so no more writes will happen.
close(issueReadSignal)
})
b.Cleanup(q.stop)
start := sync.WaitGroup{}
start.Add(1)
jobs := make(chan chunkWriteJob, b.N)
for i := 0; i < b.N; i++ {
jobs <- chunkWriteJob{
seriesRef: HeadSeriesRef(i),
ref: ChunkDiskMapperRef(i),
}
}
close(jobs)
go func() {
for range issueReadSignal {
// We don't care about the ID we're getting, we just want to grab the lock.
_ = q.get(ChunkDiskMapperRef(0))
}
}()
done := sync.WaitGroup{}
done.Add(concurrentWrites)
for w := 0; w < concurrentWrites; w++ {
go func() {
start.Wait()
for j := range jobs {
_ = q.addJob(j)
}
done.Done()
}()
}
b.ResetTimer()
start.Done()
done.Wait()
})
}
})
}
}
func queueIsEmpty(q *chunkWriteQueue) bool {
return queueSize(q) == 0
}
func queueIsFull(q *chunkWriteQueue) bool {
// When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh
// because one job is currently being processed and blocked in the writer.
return queueSize(q) == cap(q.jobs)+1
}
func queueSize(q *chunkWriteQueue) int {
q.chunkRefMapMtx.Lock()
defer q.chunkRefMapMtx.Unlock()
// Looking at chunkRefMap instead of jobCh because the job is popped from the chan before it has
// been fully processed, it remains in the chunkRefMap until the processing is complete.
return len(q.chunkRefMap)
}

223
tsdb/chunks/head_chunks.go

@ -26,7 +26,9 @@ import (
"strconv"
"sync"
"github.com/dennwc/varint"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/tsdb/chunkenc"
@ -66,6 +68,8 @@ const (
MaxWriteBufferSize = 8 * 1024 * 1024 // 8 MiB.
// DefaultWriteBufferSize is the default write buffer size.
DefaultWriteBufferSize = 4 * 1024 * 1024 // 4 MiB.
// DefaultWriteQueueSize is the default size of the in-memory queue used before flushing chunks to the disk.
DefaultWriteQueueSize = 1000
)
// ChunkDiskMapperRef represents the location of a head chunk on disk.
@ -77,10 +81,10 @@ func newChunkDiskMapperRef(seq, offset uint64) ChunkDiskMapperRef {
return ChunkDiskMapperRef((seq << 32) | offset)
}
func (ref ChunkDiskMapperRef) Unpack() (sgmIndex, chkStart int) {
sgmIndex = int(ref >> 32)
chkStart = int((ref << 32) >> 32)
return sgmIndex, chkStart
func (ref ChunkDiskMapperRef) Unpack() (seq, offset int) {
seq = int(ref >> 32)
offset = int((ref << 32) >> 32)
return seq, offset
}
// CorruptionErr is an error that's returned when corruption is encountered.
@ -94,19 +98,98 @@ func (e *CorruptionErr) Error() string {
return errors.Wrapf(e.Err, "corruption in head chunk file %s", segmentFile(e.Dir, e.FileIndex)).Error()
}
// chunkPos keeps track of the position in the head chunk files.
// chunkPos is not thread-safe, a lock must be used to protect it.
type chunkPos struct {
seq uint64 // Index of chunk file.
offset uint64 // Offset within chunk file.
cutFile bool // When true then the next chunk will be written to a new file.
}
// getNextChunkRef takes a chunk and returns the chunk reference which will refer to it once it has been written.
// getNextChunkRef also decides whether a new file should be cut before writing this chunk, and it returns the decision via the second return value.
// The order of calling getNextChunkRef must be the order in which chunks are written to the disk.
func (f *chunkPos) getNextChunkRef(chk chunkenc.Chunk) (chkRef ChunkDiskMapperRef, cutFile bool) {
chkLen := uint64(len(chk.Bytes()))
bytesToWrite := f.bytesToWriteForChunk(chkLen)
if f.shouldCutNewFile(chkLen) {
f.toNewFile()
f.cutFile = false
cutFile = true
}
chkOffset := f.offset
f.offset += bytesToWrite
return newChunkDiskMapperRef(f.seq, chkOffset), cutFile
}
// toNewFile updates the seq/offset position to point to the beginning of a new chunk file.
func (f *chunkPos) toNewFile() {
f.seq++
f.offset = SegmentHeaderSize
}
// cutFileOnNextChunk triggers that the next chunk will be written in to a new file.
// Not thread safe, a lock must be held when calling this.
func (f *chunkPos) cutFileOnNextChunk() {
f.cutFile = true
}
// initSeq sets the sequence number of the head chunk file.
// Should only be used for initialization, after that the sequence number will be managed by chunkPos.
func (f *chunkPos) initSeq(seq uint64) {
f.seq = seq
}
// shouldCutNewFile returns whether a new file should be cut based on the file size.
// The read or write lock on chunkPos must be held when calling this.
func (f *chunkPos) shouldCutNewFile(chunkSize uint64) bool {
if f.cutFile {
return true
}
return f.offset == 0 || // First head chunk file.
f.offset+chunkSize+MaxHeadChunkMetaSize > MaxHeadChunkFileSize // Exceeds the max head chunk file size.
}
// bytesToWriteForChunk returns the number of bytes that will need to be written for the given chunk size,
// including all meta data before and after the chunk data.
// Head chunk format: https://github.com/prometheus/prometheus/blob/main/tsdb/docs/format/head_chunks.md#chunk
func (f *chunkPos) bytesToWriteForChunk(chkLen uint64) uint64 {
// Headers.
bytes := uint64(SeriesRefSize) + 2*MintMaxtSize + ChunkEncodingSize
// Size of chunk length encoded as uvarint.
bytes += uint64(varint.UvarintSize(chkLen))
// Chunk length.
bytes += chkLen
// crc32.
bytes += CRCSize
return bytes
}
// ChunkDiskMapper is for writing the Head block chunks to the disk
// and access chunks via mmapped file.
type ChunkDiskMapper struct {
curFileNumBytes atomic.Int64 // Bytes written in current open file.
/// Writer.
dir *os.File
writeBufferSize int
curFile *os.File // File being written to.
curFileSequence int // Index of current open file being appended to.
curFileOffset atomic.Uint64 // Bytes written in current open file.
curFileMaxt int64 // Used for the size retention.
// The values in evtlPos represent the file position which will eventually be
// reached once the content of the write queue has been fully processed.
evtlPosMtx sync.Mutex
evtlPos chunkPos
byteBuf [MaxHeadChunkMetaSize]byte // Buffer used to write the header of the chunk.
chkWriter *bufio.Writer // Writer for the current open file.
crc32 hash.Hash
@ -128,6 +211,8 @@ type ChunkDiskMapper struct {
// This is done after iterating through all the chunks in those files using the IterateAllChunks method.
fileMaxtSet bool
writeQueue *chunkWriteQueue
closed bool
}
@ -141,7 +226,7 @@ type mmappedChunkFile struct {
// using the default head chunk file duration.
// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file.
func NewChunkDiskMapper(dir string, pool chunkenc.Pool, writeBufferSize int) (*ChunkDiskMapper, error) {
func NewChunkDiskMapper(reg prometheus.Registerer, dir string, pool chunkenc.Pool, writeBufferSize, writeQueueSize int) (*ChunkDiskMapper, error) {
// Validate write buffer size.
if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize {
return nil, errors.Errorf("ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)", MinWriteBufferSize, MaxWriteBufferSize, writeBufferSize)
@ -165,6 +250,7 @@ func NewChunkDiskMapper(dir string, pool chunkenc.Pool, writeBufferSize int) (*C
crc32: newCRC32(),
chunkBuffer: newChunkBuffer(),
}
m.writeQueue = newChunkWriteQueue(reg, writeQueueSize, m.writeChunk)
if m.pool == nil {
m.pool = chunkenc.NewPool()
@ -235,6 +321,8 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
}
}
cdm.evtlPos.initSeq(uint64(lastSeq))
return nil
}
@ -287,17 +375,44 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro
// WriteChunk writes the chunk to the disk.
// The returned chunk ref is the reference from where the chunk encoding starts for the chunk.
func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk) (chkRef ChunkDiskMapperRef, err error) {
func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) {
var err error
defer func() {
if err != nil && callback != nil {
callback(err)
}
}()
// cdm.evtlPosMtx must be held to serialize the calls to .getNextChunkRef() and .addJob().
cdm.evtlPosMtx.Lock()
defer cdm.evtlPosMtx.Unlock()
ref, cutFile := cdm.evtlPos.getNextChunkRef(chk)
err = cdm.writeQueue.addJob(chunkWriteJob{
cutFile: cutFile,
seriesRef: seriesRef,
mint: mint,
maxt: maxt,
chk: chk,
ref: ref,
callback: callback,
})
return ref
}
func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, ref ChunkDiskMapperRef, cutFile bool) (err error) {
cdm.writePathMtx.Lock()
defer cdm.writePathMtx.Unlock()
if cdm.closed {
return 0, ErrChunkDiskMapperClosed
return ErrChunkDiskMapperClosed
}
if cdm.shouldCutNewFile(len(chk.Bytes())) {
if err := cdm.cut(); err != nil {
return 0, err
if cutFile {
err := cdm.cutAndExpectRef(ref)
if err != nil {
return err
}
}
@ -305,15 +420,13 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64
// so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer).
if len(chk.Bytes())+MaxHeadChunkMetaSize < cdm.writeBufferSize && cdm.chkWriter.Available() < MaxHeadChunkMetaSize+len(chk.Bytes()) {
if err := cdm.flushBuffer(); err != nil {
return 0, err
return err
}
}
cdm.crc32.Reset()
bytesWritten := 0
chkRef = newChunkDiskMapperRef(uint64(cdm.curFileSequence), uint64(cdm.curFileSize()))
binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(seriesRef))
bytesWritten += SeriesRefSize
binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(mint))
@ -326,59 +439,69 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64
bytesWritten += n
if err := cdm.writeAndAppendToCRC32(cdm.byteBuf[:bytesWritten]); err != nil {
return 0, err
return err
}
if err := cdm.writeAndAppendToCRC32(chk.Bytes()); err != nil {
return 0, err
return err
}
if err := cdm.writeCRC32(); err != nil {
return 0, err
return err
}
if maxt > cdm.curFileMaxt {
cdm.curFileMaxt = maxt
}
cdm.chunkBuffer.put(chkRef, chk)
cdm.chunkBuffer.put(ref, chk)
if len(chk.Bytes())+MaxHeadChunkMetaSize >= cdm.writeBufferSize {
// The chunk was bigger than the buffer itself.
// Flushing to not keep partial chunks in buffer.
if err := cdm.flushBuffer(); err != nil {
return 0, err
return err
}
}
return chkRef, nil
return nil
}
// shouldCutNewFile returns whether a new file should be cut, based on time and size retention.
// Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map.
// Time retention: so that we can delete old chunks with some time guarantee in low load environments.
func (cdm *ChunkDiskMapper) shouldCutNewFile(chunkSize int) bool {
return cdm.curFileSize() == 0 || // First head chunk file.
cdm.curFileSize()+int64(chunkSize+MaxHeadChunkMetaSize) > MaxHeadChunkFileSize // Exceeds the max head chunk file size.
// CutNewFile makes that a new file will be created the next time a chunk is written.
func (cdm *ChunkDiskMapper) CutNewFile() {
cdm.evtlPosMtx.Lock()
defer cdm.evtlPosMtx.Unlock()
cdm.evtlPos.cutFileOnNextChunk()
}
// CutNewFile creates a new m-mapped file.
func (cdm *ChunkDiskMapper) CutNewFile() (returnErr error) {
cdm.writePathMtx.Lock()
defer cdm.writePathMtx.Unlock()
// cutAndExpectRef creates a new m-mapped file.
// The write lock should be held before calling this.
// It ensures that the position in the new file matches the given chunk reference, if not then it errors.
func (cdm *ChunkDiskMapper) cutAndExpectRef(chkRef ChunkDiskMapperRef) (err error) {
seq, offset, err := cdm.cut()
if err != nil {
return err
}
if expSeq, expOffset := chkRef.Unpack(); seq != expSeq || offset != expOffset {
return errors.Errorf("expected newly cut file to have sequence:offset %d:%d, got %d:%d", expSeq, expOffset, seq, offset)
}
return cdm.cut()
return nil
}
// cut creates a new m-mapped file. The write lock should be held before calling this.
func (cdm *ChunkDiskMapper) cut() (returnErr error) {
// It returns the file sequence and the offset in that file to start writing chunks.
func (cdm *ChunkDiskMapper) cut() (seq, offset int, returnErr error) {
// Sync current tail to disk and close.
if err := cdm.finalizeCurFile(); err != nil {
return err
return 0, 0, err
}
n, newFile, seq, err := cutSegmentFile(cdm.dir, MagicHeadChunks, headChunksFormatV1, HeadChunkFilePreallocationSize)
offset, newFile, seq, err := cutSegmentFile(cdm.dir, MagicHeadChunks, headChunksFormatV1, HeadChunkFilePreallocationSize)
if err != nil {
return err
return 0, 0, err
}
defer func() {
// The file should not be closed if there is no error,
// its kept open in the ChunkDiskMapper.
@ -387,7 +510,7 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) {
}
}()
cdm.curFileNumBytes.Store(int64(n))
cdm.curFileOffset.Store(uint64(offset))
if cdm.curFile != nil {
cdm.readPathMtx.Lock()
@ -397,7 +520,7 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) {
mmapFile, err := fileutil.OpenMmapFileWithSize(newFile.Name(), MaxHeadChunkFileSize)
if err != nil {
return err
return 0, 0, err
}
cdm.readPathMtx.Lock()
@ -415,7 +538,7 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) {
cdm.curFileMaxt = 0
return nil
return seq, offset, nil
}
// finalizeCurFile writes all pending data to the current tail file,
@ -438,7 +561,7 @@ func (cdm *ChunkDiskMapper) finalizeCurFile() error {
func (cdm *ChunkDiskMapper) write(b []byte) error {
n, err := cdm.chkWriter.Write(b)
cdm.curFileNumBytes.Add(int64(n))
cdm.curFileOffset.Add(uint64(n))
return err
}
@ -476,6 +599,11 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
return nil, ErrChunkDiskMapperClosed
}
chunk := cdm.writeQueue.get(ref)
if chunk != nil {
return chunk, nil
}
sgmIndex, chkStart := ref.Unpack()
// We skip the series ref and the mint/maxt beforehand.
chkStart += SeriesRefSize + (2 * MintMaxtSize)
@ -732,7 +860,10 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error {
errs := tsdb_errors.NewMulti()
// Cut a new file only if the current file has some chunks.
if cdm.curFileSize() > HeadChunkFileHeaderSize {
errs.Add(cdm.CutNewFile())
// There is a known race condition here because between the check of curFileSize() and the call to CutNewFile()
// a new file could already be cut, this is acceptable because it will simply result in an empty file which
// won't do any harm.
cdm.CutNewFile()
}
errs.Add(cdm.deleteFiles(removedFiles))
return errs.Err()
@ -787,13 +918,19 @@ func (cdm *ChunkDiskMapper) Size() (int64, error) {
return fileutil.DirSize(cdm.dir.Name())
}
func (cdm *ChunkDiskMapper) curFileSize() int64 {
return cdm.curFileNumBytes.Load()
func (cdm *ChunkDiskMapper) curFileSize() uint64 {
return cdm.curFileOffset.Load()
}
// Close closes all the open files in ChunkDiskMapper.
// It is not longer safe to access chunks from this struct after calling Close.
func (cdm *ChunkDiskMapper) Close() error {
// Locking the eventual position lock blocks WriteChunk()
cdm.evtlPosMtx.Lock()
defer cdm.evtlPosMtx.Unlock()
cdm.writeQueue.stop()
// 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file.
// The lock order should not be reversed here else it can cause deadlocks.
cdm.writePathMtx.Lock()

151
tsdb/chunks/head_chunks_test.go

@ -28,7 +28,7 @@ import (
)
func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
hrw := testChunkDiskMapper(t)
hrw := createChunkDiskMapper(t, "")
defer func() {
require.NoError(t, hrw.Close())
}()
@ -129,8 +129,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
// Testing IterateAllChunks method.
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
hrw = createChunkDiskMapper(t, dir)
idx := 0
require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error {
@ -156,10 +155,9 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
// TestChunkDiskMapper_Truncate tests
// * If truncation is happening properly based on the time passed.
// * The active file is not deleted even if the passed time makes it eligible to be deleted.
// * Empty current file does not lead to creation of another file after truncation.
// * Non-empty current file leads to creation of another file after truncation.
func TestChunkDiskMapper_Truncate(t *testing.T) {
hrw := testChunkDiskMapper(t)
hrw := createChunkDiskMapper(t, "")
defer func() {
require.NoError(t, hrw.Close())
}()
@ -167,16 +165,20 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
timeRange := 0
fileTimeStep := 100
var thirdFileMinT, sixthFileMinT int64
addChunk := func() int {
mint := timeRange + 1 // Just after the new file cut.
maxt := timeRange + fileTimeStep - 1 // Just before the next file.
t.Helper()
// Write a chunks to set maxt for the segment.
_, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t))
step := 100
mint, maxt := timeRange+1, timeRange+step-1
var err error
awaitCb := make(chan struct{})
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(cbErr error) {
err = cbErr
close(awaitCb)
})
<-awaitCb
require.NoError(t, err)
timeRange += fileTimeStep
timeRange += step
return mint
}
@ -198,7 +200,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
// Create segments 1 to 7.
for i := 1; i <= 7; i++ {
require.NoError(t, hrw.CutNewFile())
hrw.CutNewFile()
mint := int64(addChunk())
if i == 3 {
thirdFileMinT = mint
@ -210,19 +212,17 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
// Truncating files.
require.NoError(t, hrw.Truncate(thirdFileMinT))
// Add a chunk to trigger cutting of new file.
addChunk()
verifyFiles([]int{3, 4, 5, 6, 7, 8})
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarted.
var err error
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil }))
require.True(t, hrw.fileMaxtSet)
hrw = createChunkDiskMapper(t, dir)
verifyFiles([]int{3, 4, 5, 6, 7, 8})
// New file is created after restart even if last file was empty.
@ -231,15 +231,23 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
// Truncating files after restart.
require.NoError(t, hrw.Truncate(sixthFileMinT))
verifyFiles([]int{6, 7, 8, 9, 10})
verifyFiles([]int{6, 7, 8, 9})
// As the last file was empty, this creates no new files.
// Truncating a second time without adding a chunk shouldn't create a new file.
require.NoError(t, hrw.Truncate(sixthFileMinT+1))
verifyFiles([]int{6, 7, 8, 9, 10})
verifyFiles([]int{6, 7, 8, 9})
// Add a chunk to trigger cutting of new file.
addChunk()
verifyFiles([]int{6, 7, 8, 9, 10})
// Truncating till current time should not delete the current active file.
require.NoError(t, hrw.Truncate(int64(timeRange+(2*fileTimeStep))))
// Add a chunk to trigger cutting of new file.
addChunk()
verifyFiles([]int{10, 11}) // One file is the previously active file and one currently created.
}
@ -248,23 +256,40 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
// This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation
// simply deleted all empty files instead of stopping once it encountered a non-empty file.
func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
hrw := testChunkDiskMapper(t)
hrw := createChunkDiskMapper(t, "")
defer func() {
require.NoError(t, hrw.Close())
}()
timeRange := 0
addChunk := func() {
t.Helper()
awaitCb := make(chan struct{})
step := 100
mint, maxt := timeRange+1, timeRange+step-1
_, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t))
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) {
close(awaitCb)
require.NoError(t, err)
})
<-awaitCb
timeRange += step
}
emptyFile := func() {
require.NoError(t, hrw.CutNewFile())
t.Helper()
_, _, err := hrw.cut()
require.NoError(t, err)
hrw.evtlPosMtx.Lock()
hrw.evtlPos.toNewFile()
hrw.evtlPosMtx.Unlock()
}
nonEmptyFile := func() {
t.Helper()
emptyFile()
addChunk()
}
@ -297,42 +322,48 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
// though files 4 and 6 are empty.
file2Maxt := hrw.mmappedChunkFiles[2].maxt
require.NoError(t, hrw.Truncate(file2Maxt+1))
// As 6 was empty, it should not create another file.
verifyFiles([]int{3, 4, 5, 6})
// Add chunk, so file 6 is not empty anymore.
addChunk()
// Truncate creates another file as 6 is not empty now.
require.NoError(t, hrw.Truncate(file2Maxt+1))
verifyFiles([]int{3, 4, 5, 6, 7})
verifyFiles([]int{3, 4, 5, 6})
// Truncating till file 3 should also delete file 4, because it is empty.
file3Maxt := hrw.mmappedChunkFiles[3].maxt
require.NoError(t, hrw.Truncate(file3Maxt+1))
addChunk()
verifyFiles([]int{5, 6, 7})
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarting checks for unsequential files.
var err error
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
verifyFiles([]int{3, 4, 5, 6, 7})
hrw = createChunkDiskMapper(t, dir)
verifyFiles([]int{5, 6, 7})
}
// TestHeadReadWriter_TruncateAfterIterateChunksError tests for
// https://github.com/prometheus/prometheus/issues/7753
func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
hrw := testChunkDiskMapper(t)
hrw := createChunkDiskMapper(t, "")
defer func() {
require.NoError(t, hrw.Close())
}()
// Write a chunks to iterate on it later.
_, err := hrw.WriteChunk(1, 0, 1000, randomChunk(t))
var err error
awaitCb := make(chan struct{})
hrw.WriteChunk(1, 0, 1000, randomChunk(t), func(cbErr error) {
err = cbErr
close(awaitCb)
})
<-awaitCb
require.NoError(t, err)
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarting to recreate https://github.com/prometheus/prometheus/issues/7753.
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
hrw = createChunkDiskMapper(t, dir)
// Forcefully failing IterateAllChunks.
require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error {
@ -344,21 +375,31 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
}
func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
hrw := testChunkDiskMapper(t)
hrw := createChunkDiskMapper(t, "")
defer func() {
require.NoError(t, hrw.Close())
}()
timeRange := 0
addChunk := func() {
t.Helper()
step := 100
mint, maxt := timeRange+1, timeRange+step-1
_, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t))
var err error
awaitCb := make(chan struct{})
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(cbErr error) {
err = cbErr
close(awaitCb)
})
<-awaitCb
require.NoError(t, err)
timeRange += step
}
nonEmptyFile := func() {
require.NoError(t, hrw.CutNewFile())
t.Helper()
hrw.CutNewFile()
addChunk()
}
@ -388,11 +429,7 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
require.NoError(t, f.Close())
// Open chunk disk mapper again, corrupt file should be removed.
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil }))
require.True(t, hrw.fileMaxtSet)
hrw = createChunkDiskMapper(t, dir)
// Removed from memory.
require.Equal(t, 3, len(hrw.mmappedChunkFiles))
@ -411,18 +448,22 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
}
}
func testChunkDiskMapper(t *testing.T) *ChunkDiskMapper {
tmpdir, err := ioutil.TempDir("", "data")
func createChunkDiskMapper(t *testing.T, dir string) *ChunkDiskMapper {
if dir == "" {
var err error
dir, err = ioutil.TempDir("", "data")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(tmpdir))
require.NoError(t, os.RemoveAll(dir))
})
}
hrw, err := NewChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize)
hrw, err := NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), DefaultWriteBufferSize, DefaultWriteQueueSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil }))
require.True(t, hrw.fileMaxtSet)
return hrw
}
@ -443,7 +484,11 @@ func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSer
mint = int64((idx)*1000 + 1)
maxt = int64((idx + 1) * 1000)
chunk = randomChunk(t)
chunkRef, err = hrw.WriteChunk(seriesRef, mint, maxt, chunk)
awaitCb := make(chan struct{})
chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) {
require.NoError(t, err)
close(awaitCb)
})
<-awaitCb
return
}

8
tsdb/db.go

@ -80,6 +80,7 @@ func DefaultOptions() *Options {
StripeSize: DefaultStripeSize,
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
IsolationDisabled: defaultIsolationDisabled,
HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize,
}
}
@ -135,6 +136,9 @@ type Options struct {
// HeadChunksWriteBufferSize configures the write buffer size used by the head chunks mapper.
HeadChunksWriteBufferSize int
// HeadChunksWriteQueueSize configures the size of the chunk write queue used in the head chunks mapper.
HeadChunksWriteQueueSize int
// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series.
// It is always a no-op in Prometheus and mainly meant for external users who import TSDB.
SeriesLifecycleCallback SeriesLifecycleCallback
@ -582,6 +586,9 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
if opts.HeadChunksWriteBufferSize <= 0 {
opts.HeadChunksWriteBufferSize = chunks.DefaultWriteBufferSize
}
if opts.HeadChunksWriteQueueSize < 0 {
opts.HeadChunksWriteQueueSize = chunks.DefaultWriteQueueSize
}
if opts.MaxBlockChunkSegmentSize <= 0 {
opts.MaxBlockChunkSegmentSize = chunks.DefaultChunkSegmentSize
}
@ -704,6 +711,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.ChunkDirRoot = dir
headOpts.ChunkPool = db.chunkPool
headOpts.ChunkWriteBufferSize = opts.HeadChunksWriteBufferSize
headOpts.ChunkWriteQueueSize = opts.HeadChunksWriteQueueSize
headOpts.StripeSize = opts.StripeSize
headOpts.SeriesCallback = opts.SeriesLifecycleCallback
headOpts.EnableExemplarStorage = opts.EnableExemplarStorage

10
tsdb/db_test.go

@ -212,17 +212,13 @@ func TestNoPanicAfterWALCorruption(t *testing.T) {
var maxt int64
ctx := context.Background()
{
for {
// Appending 121 samples because on the 121st a new chunk will be created.
for i := 0; i < 121; i++ {
app := db.Appender(ctx)
_, err := app.Append(0, labels.FromStrings("foo", "bar"), maxt, 0)
expSamples = append(expSamples, sample{t: maxt, v: 0})
require.NoError(t, err)
require.NoError(t, app.Commit())
mmapedChunks, err := ioutil.ReadDir(mmappedChunksDir(db.Dir()))
require.NoError(t, err)
if len(mmapedChunks) > 0 {
break
}
maxt++
}
require.NoError(t, db.Close())
@ -2453,7 +2449,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
require.NoError(t, err)
}
require.NoError(t, app.Commit())
defer func() { require.NoError(t, db.Close()) }()
require.NoError(t, db.Close())
}
// Flush WAL.

5
tsdb/head.go

@ -127,6 +127,8 @@ type HeadOptions struct {
ChunkDirRoot string
ChunkPool chunkenc.Pool
ChunkWriteBufferSize int
ChunkWriteQueueSize int
// StripeSize sets the number of entries in the hash map, it must be a power of 2.
// A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
// A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series.
@ -144,6 +146,7 @@ func DefaultHeadOptions() *HeadOptions {
ChunkDirRoot: "",
ChunkPool: chunkenc.NewPool(),
ChunkWriteBufferSize: chunks.DefaultWriteBufferSize,
ChunkWriteQueueSize: chunks.DefaultWriteQueueSize,
StripeSize: DefaultStripeSize,
SeriesCallback: &noopSeriesLifecycleCallback{},
IsolationDisabled: defaultIsolationDisabled,
@ -208,9 +211,11 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti
}
h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(
r,
mmappedChunksDir(opts.ChunkDirRoot),
opts.ChunkPool,
opts.ChunkWriteBufferSize,
opts.ChunkWriteQueueSize,
)
if err != nil {
return nil, err

13
tsdb/head_append.go

@ -571,12 +571,7 @@ func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper
return
}
chunkRef, err := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk)
if err != nil {
if err != chunks.ErrChunkDiskMapperClosed {
panic(err)
}
}
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk, handleChunkWriteError)
s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{
ref: chunkRef,
numSamples: uint16(s.headChunk.chunk.NumSamples()),
@ -585,6 +580,12 @@ func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper
})
}
func handleChunkWriteError(err error) {
if err != nil && err != chunks.ErrChunkDiskMapperClosed {
panic(err)
}
}
// Rollback removes the samples and exemplars from headAppender and writes any series to WAL.
func (a *headAppender) Rollback() (err error) {
if a.closed {

206
tsdb/head_test.go

@ -35,6 +35,7 @@ import (
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/exemplar"
@ -224,7 +225,7 @@ func BenchmarkLoadWAL(b *testing.B) {
// Write mmapped chunks.
if c.mmappedChunkT != 0 {
chunkDiskMapper, err := chunks.NewChunkDiskMapper(mmappedChunksDir(dir), chunkenc.NewPool(), chunks.DefaultWriteBufferSize)
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, mmappedChunksDir(dir), chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
require.NoError(b, err)
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
// Create one mmapped chunk per series, with one sample at the given time.
@ -270,6 +271,195 @@ func BenchmarkLoadWAL(b *testing.B) {
}
}
// TestHead_HighConcurrencyReadAndWrite generates 1000 series with a step of 15s and fills a whole block with samples,
// this means in total it generates 4000 chunks because with a step of 15s there are 4 chunks per block per series.
// While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the
// returned results are correct.
func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
head, _ := newTestHead(t, DefaultBlockDuration, false)
defer func() {
require.NoError(t, head.Close())
}()
seriesCnt := 1000
readConcurrency := 2
writeConcurrency := 10
startTs := uint64(DefaultBlockDuration) // start at the second block relative to the unix epoch.
qryRange := uint64(5 * time.Minute.Milliseconds())
step := uint64(15 * time.Second / time.Millisecond)
endTs := startTs + uint64(DefaultBlockDuration)
labelSets := make([]labels.Labels, seriesCnt)
for i := 0; i < seriesCnt; i++ {
labelSets[i] = labels.FromStrings("seriesId", strconv.Itoa(i))
}
head.Init(0)
g, ctx := errgroup.WithContext(context.Background())
whileNotCanceled := func(f func() (bool, error)) error {
for ctx.Err() == nil {
cont, err := f()
if err != nil {
return err
}
if !cont {
return nil
}
}
return nil
}
// Create one channel for each write worker, the channels will be used by the coordinator
// go routine to coordinate which timestamps each write worker has to write.
writerTsCh := make([]chan uint64, writeConcurrency)
for writerTsChIdx := range writerTsCh {
writerTsCh[writerTsChIdx] = make(chan uint64)
}
// workerReadyWg is used to synchronize the start of the test,
// we only start the test once all workers signal that they're ready.
var workerReadyWg sync.WaitGroup
workerReadyWg.Add(writeConcurrency + readConcurrency)
// Start the write workers.
for wid := 0; wid < writeConcurrency; wid++ {
// Create copy of workerID to be used by worker routine.
workerID := wid
g.Go(func() error {
// The label sets which this worker will write.
workerLabelSets := labelSets[(seriesCnt/writeConcurrency)*workerID : (seriesCnt/writeConcurrency)*(workerID+1)]
// Signal that this worker is ready.
workerReadyWg.Done()
return whileNotCanceled(func() (bool, error) {
ts, ok := <-writerTsCh[workerID]
if !ok {
return false, nil
}
app := head.Appender(ctx)
for i := 0; i < len(workerLabelSets); i++ {
// We also use the timestamp as the sample value.
_, err := app.Append(0, workerLabelSets[i], int64(ts), float64(ts))
if err != nil {
return false, fmt.Errorf("Error when appending to head: %w", err)
}
}
return true, app.Commit()
})
})
}
// queryHead is a helper to query the head for a given time range and labelset.
queryHead := func(mint, maxt uint64, label labels.Label) (map[string][]tsdbutil.Sample, error) {
q, err := NewBlockQuerier(head, int64(mint), int64(maxt))
if err != nil {
return nil, err
}
return query(t, q, labels.MustNewMatcher(labels.MatchEqual, label.Name, label.Value)), nil
}
// readerTsCh will be used by the coordinator go routine to coordinate which timestamps the reader should read.
readerTsCh := make(chan uint64)
// Start the read workers.
for wid := 0; wid < readConcurrency; wid++ {
// Create copy of threadID to be used by worker routine.
workerID := wid
g.Go(func() error {
querySeriesRef := (seriesCnt / readConcurrency) * workerID
// Signal that this worker is ready.
workerReadyWg.Done()
return whileNotCanceled(func() (bool, error) {
ts, ok := <-readerTsCh
if !ok {
return false, nil
}
querySeriesRef = (querySeriesRef + 1) % seriesCnt
lbls := labelSets[querySeriesRef]
samples, err := queryHead(ts-qryRange, ts, lbls[0])
if err != nil {
return false, err
}
if len(samples) != 1 {
return false, fmt.Errorf("expected 1 series, got %d", len(samples))
}
series := lbls.String()
expectSampleCnt := qryRange/step + 1
if expectSampleCnt != uint64(len(samples[series])) {
return false, fmt.Errorf("expected %d samples, got %d", expectSampleCnt, len(samples[series]))
}
for sampleIdx, sample := range samples[series] {
expectedValue := ts - qryRange + (uint64(sampleIdx) * step)
if sample.T() != int64(expectedValue) {
return false, fmt.Errorf("expected sample %d to have ts %d, got %d", sampleIdx, expectedValue, sample.T())
}
if sample.V() != float64(expectedValue) {
return false, fmt.Errorf("expected sample %d to have value %d, got %f", sampleIdx, expectedValue, sample.V())
}
}
return true, nil
})
})
}
// Start the coordinator go routine.
g.Go(func() error {
currTs := startTs
defer func() {
// End of the test, close all channels to stop the workers.
for _, ch := range writerTsCh {
close(ch)
}
close(readerTsCh)
}()
// Wait until all workers are ready to start the test.
workerReadyWg.Wait()
return whileNotCanceled(func() (bool, error) {
// Send the current timestamp to each of the writers.
for _, ch := range writerTsCh {
select {
case ch <- currTs:
case <-ctx.Done():
return false, nil
}
}
// Once data for at least <qryRange> has been ingested, send the current timestamp to the readers.
if currTs > startTs+qryRange {
select {
case readerTsCh <- currTs - step:
case <-ctx.Done():
return false, nil
}
}
currTs += step
if currTs > endTs {
return false, nil
}
return true, nil
})
})
require.NoError(t, g.Wait())
}
func TestHead_ReadWAL(t *testing.T) {
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
@ -540,7 +730,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
require.NoError(t, os.RemoveAll(dir))
}()
// This is usually taken from the Head, but passing manually here.
chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize)
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
require.NoError(t, err)
defer func() {
require.NoError(t, chunkDiskMapper.Close())
@ -1084,7 +1274,7 @@ func TestMemSeries_append(t *testing.T) {
require.NoError(t, os.RemoveAll(dir))
}()
// This is usually taken from the Head, but passing manually here.
chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize)
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
require.NoError(t, err)
defer func() {
require.NoError(t, chunkDiskMapper.Close())
@ -1462,14 +1652,16 @@ func TestHeadReadWriterRepair(t *testing.T) {
ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunk was created")
require.NoError(t, h.chunkDiskMapper.CutNewFile())
h.chunkDiskMapper.CutNewFile()
}
require.NoError(t, h.Close())
// Verify that there are 7 segment files.
// Verify that there are 6 segment files.
// It should only be 6 because the last call to .CutNewFile() won't
// take effect without another chunk being written.
files, err := ioutil.ReadDir(mmappedChunksDir(dir))
require.NoError(t, err)
require.Equal(t, 7, len(files))
require.Equal(t, 6, len(files))
// Corrupt the 4th file by writing a random byte to series ref.
f, err := os.OpenFile(filepath.Join(mmappedChunksDir(dir), files[3].Name()), os.O_WRONLY, 0o666)
@ -2270,7 +2462,7 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) {
require.NoError(t, os.RemoveAll(dir))
}()
// This is usually taken from the Head, but passing manually here.
chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize)
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
require.NoError(t, err)
defer func() {
require.NoError(t, chunkDiskMapper.Close())

Loading…
Cancel
Save