mirror of https://github.com/prometheus/prometheus
505 lines
14 KiB
Go
505 lines
14 KiB
Go
// Copyright 2020 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package chunks
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"errors"
|
|
"math/rand"
|
|
"os"
|
|
"strconv"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
)
|
|
|
|
var writeQueueSize int
|
|
|
|
func TestMain(m *testing.M) {
|
|
// Run all tests with the chunk write queue disabled.
|
|
writeQueueSize = 0
|
|
exitVal := m.Run()
|
|
if exitVal != 0 {
|
|
os.Exit(exitVal)
|
|
}
|
|
|
|
// Re-run all tests with the chunk write queue size of 1e6.
|
|
writeQueueSize = 1000000
|
|
exitVal = m.Run()
|
|
os.Exit(exitVal)
|
|
}
|
|
|
|
func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
|
|
hrw := createChunkDiskMapper(t, "")
|
|
defer func() {
|
|
require.NoError(t, hrw.Close())
|
|
}()
|
|
|
|
expectedBytes := []byte{}
|
|
nextChunkOffset := uint64(HeadChunkFileHeaderSize)
|
|
chkCRC32 := newCRC32()
|
|
|
|
type expectedDataType struct {
|
|
seriesRef HeadSeriesRef
|
|
chunkRef ChunkDiskMapperRef
|
|
mint, maxt int64
|
|
numSamples uint16
|
|
chunk chunkenc.Chunk
|
|
}
|
|
expectedData := []expectedDataType{}
|
|
|
|
var buf [MaxHeadChunkMetaSize]byte
|
|
totalChunks := 0
|
|
var firstFileName string
|
|
for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 {
|
|
addChunks := func(numChunks int) {
|
|
for i := 0; i < numChunks; i++ {
|
|
seriesRef, chkRef, mint, maxt, chunk := createChunk(t, totalChunks, hrw)
|
|
totalChunks++
|
|
expectedData = append(expectedData, expectedDataType{
|
|
seriesRef: seriesRef,
|
|
mint: mint,
|
|
maxt: maxt,
|
|
chunkRef: chkRef,
|
|
chunk: chunk,
|
|
numSamples: uint16(chunk.NumSamples()),
|
|
})
|
|
|
|
if hrw.curFileSequence != 1 {
|
|
// We are checking for bytes written only for the first file.
|
|
continue
|
|
}
|
|
|
|
// Calculating expected bytes written on disk for first file.
|
|
firstFileName = hrw.curFile.Name()
|
|
require.Equal(t, newChunkDiskMapperRef(1, nextChunkOffset), chkRef)
|
|
|
|
bytesWritten := 0
|
|
chkCRC32.Reset()
|
|
|
|
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(seriesRef))
|
|
bytesWritten += SeriesRefSize
|
|
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint))
|
|
bytesWritten += MintMaxtSize
|
|
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt))
|
|
bytesWritten += MintMaxtSize
|
|
buf[bytesWritten] = byte(chunk.Encoding())
|
|
bytesWritten += ChunkEncodingSize
|
|
n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes())))
|
|
bytesWritten += n
|
|
|
|
expectedBytes = append(expectedBytes, buf[:bytesWritten]...)
|
|
_, err := chkCRC32.Write(buf[:bytesWritten])
|
|
require.NoError(t, err)
|
|
expectedBytes = append(expectedBytes, chunk.Bytes()...)
|
|
_, err = chkCRC32.Write(chunk.Bytes())
|
|
require.NoError(t, err)
|
|
|
|
expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...)
|
|
|
|
// += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC.
|
|
nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize
|
|
}
|
|
}
|
|
addChunks(100)
|
|
hrw.CutNewFile()
|
|
addChunks(10) // For chunks in in-memory buffer.
|
|
}
|
|
|
|
// Checking on-disk bytes for the first file.
|
|
require.Equal(t, 3, len(hrw.mmappedChunkFiles), "expected 3 mmapped files, got %d", len(hrw.mmappedChunkFiles))
|
|
require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers))
|
|
|
|
actualBytes, err := os.ReadFile(firstFileName)
|
|
require.NoError(t, err)
|
|
|
|
// Check header of the segment file.
|
|
require.Equal(t, MagicHeadChunks, int(binary.BigEndian.Uint32(actualBytes[0:MagicChunksSize])))
|
|
require.Equal(t, chunksFormatV1, int(actualBytes[MagicChunksSize]))
|
|
|
|
// Remaining chunk data.
|
|
fileEnd := HeadChunkFileHeaderSize + len(expectedBytes)
|
|
require.Equal(t, expectedBytes, actualBytes[HeadChunkFileHeaderSize:fileEnd])
|
|
|
|
// Testing reading of chunks.
|
|
for _, exp := range expectedData {
|
|
actChunk, err := hrw.Chunk(exp.chunkRef)
|
|
require.NoError(t, err)
|
|
require.Equal(t, exp.chunk.Bytes(), actChunk.Bytes())
|
|
}
|
|
|
|
// Testing IterateAllChunks method.
|
|
dir := hrw.dir.Name()
|
|
require.NoError(t, hrw.Close())
|
|
hrw = createChunkDiskMapper(t, dir)
|
|
|
|
idx := 0
|
|
require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error {
|
|
t.Helper()
|
|
|
|
expData := expectedData[idx]
|
|
require.Equal(t, expData.seriesRef, seriesRef)
|
|
require.Equal(t, expData.chunkRef, chunkRef)
|
|
require.Equal(t, expData.maxt, maxt)
|
|
require.Equal(t, expData.maxt, maxt)
|
|
require.Equal(t, expData.numSamples, numSamples)
|
|
|
|
actChunk, err := hrw.Chunk(expData.chunkRef)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expData.chunk.Bytes(), actChunk.Bytes())
|
|
|
|
idx++
|
|
return nil
|
|
}))
|
|
require.Equal(t, len(expectedData), idx)
|
|
}
|
|
|
|
// TestChunkDiskMapper_Truncate tests
|
|
// * If truncation is happening properly based on the time passed.
|
|
// * The active file is not deleted even if the passed time makes it eligible to be deleted.
|
|
// * Non-empty current file leads to creation of another file after truncation.
|
|
func TestChunkDiskMapper_Truncate(t *testing.T) {
|
|
hrw := createChunkDiskMapper(t, "")
|
|
defer func() {
|
|
require.NoError(t, hrw.Close())
|
|
}()
|
|
|
|
timeRange := 0
|
|
fileTimeStep := 100
|
|
var thirdFileMinT, sixthFileMinT int64
|
|
addChunk := func() int {
|
|
t.Helper()
|
|
|
|
step := 100
|
|
mint, maxt := timeRange+1, timeRange+step-1
|
|
var err error
|
|
awaitCb := make(chan struct{})
|
|
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(cbErr error) {
|
|
err = cbErr
|
|
close(awaitCb)
|
|
})
|
|
<-awaitCb
|
|
require.NoError(t, err)
|
|
timeRange += step
|
|
|
|
return mint
|
|
}
|
|
|
|
verifyFiles := func(remainingFiles []int) {
|
|
t.Helper()
|
|
|
|
files, err := os.ReadDir(hrw.dir.Name())
|
|
require.NoError(t, err)
|
|
require.Equal(t, len(remainingFiles), len(files), "files on disk")
|
|
require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles")
|
|
require.Equal(t, len(remainingFiles), len(hrw.closers), "closers")
|
|
|
|
for _, i := range remainingFiles {
|
|
_, ok := hrw.mmappedChunkFiles[i]
|
|
require.Equal(t, true, ok)
|
|
}
|
|
}
|
|
|
|
// Create segments 1 to 7.
|
|
for i := 1; i <= 7; i++ {
|
|
hrw.CutNewFile()
|
|
mint := int64(addChunk())
|
|
if i == 3 {
|
|
thirdFileMinT = mint
|
|
} else if i == 6 {
|
|
sixthFileMinT = mint
|
|
}
|
|
}
|
|
verifyFiles([]int{1, 2, 3, 4, 5, 6, 7})
|
|
|
|
// Truncating files.
|
|
require.NoError(t, hrw.Truncate(thirdFileMinT))
|
|
|
|
// Add a chunk to trigger cutting of new file.
|
|
addChunk()
|
|
|
|
verifyFiles([]int{3, 4, 5, 6, 7, 8})
|
|
|
|
dir := hrw.dir.Name()
|
|
require.NoError(t, hrw.Close())
|
|
|
|
// Restarted.
|
|
hrw = createChunkDiskMapper(t, dir)
|
|
|
|
verifyFiles([]int{3, 4, 5, 6, 7, 8})
|
|
// New file is created after restart even if last file was empty.
|
|
addChunk()
|
|
verifyFiles([]int{3, 4, 5, 6, 7, 8, 9})
|
|
|
|
// Truncating files after restart.
|
|
require.NoError(t, hrw.Truncate(sixthFileMinT))
|
|
verifyFiles([]int{6, 7, 8, 9})
|
|
|
|
// Truncating a second time without adding a chunk shouldn't create a new file.
|
|
require.NoError(t, hrw.Truncate(sixthFileMinT+1))
|
|
verifyFiles([]int{6, 7, 8, 9})
|
|
|
|
// Add a chunk to trigger cutting of new file.
|
|
addChunk()
|
|
|
|
verifyFiles([]int{6, 7, 8, 9, 10})
|
|
|
|
// Truncating till current time should not delete the current active file.
|
|
require.NoError(t, hrw.Truncate(int64(timeRange+(2*fileTimeStep))))
|
|
|
|
// Add a chunk to trigger cutting of new file.
|
|
addChunk()
|
|
|
|
verifyFiles([]int{10, 11}) // One file is the previously active file and one currently created.
|
|
}
|
|
|
|
// TestChunkDiskMapper_Truncate_PreservesFileSequence tests that truncation doesn't poke
|
|
// holes into the file sequence, even if there are empty files in between non-empty files.
|
|
// This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation
|
|
// simply deleted all empty files instead of stopping once it encountered a non-empty file.
|
|
func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
|
|
hrw := createChunkDiskMapper(t, "")
|
|
defer func() {
|
|
require.NoError(t, hrw.Close())
|
|
}()
|
|
timeRange := 0
|
|
|
|
addChunk := func() {
|
|
t.Helper()
|
|
|
|
awaitCb := make(chan struct{})
|
|
|
|
step := 100
|
|
mint, maxt := timeRange+1, timeRange+step-1
|
|
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) {
|
|
close(awaitCb)
|
|
require.NoError(t, err)
|
|
})
|
|
<-awaitCb
|
|
timeRange += step
|
|
}
|
|
|
|
emptyFile := func() {
|
|
t.Helper()
|
|
|
|
_, _, err := hrw.cut()
|
|
require.NoError(t, err)
|
|
hrw.evtlPosMtx.Lock()
|
|
hrw.evtlPos.toNewFile()
|
|
hrw.evtlPosMtx.Unlock()
|
|
}
|
|
|
|
nonEmptyFile := func() {
|
|
t.Helper()
|
|
|
|
emptyFile()
|
|
addChunk()
|
|
}
|
|
|
|
addChunk() // 1. Created with the first chunk.
|
|
nonEmptyFile() // 2.
|
|
nonEmptyFile() // 3.
|
|
emptyFile() // 4.
|
|
nonEmptyFile() // 5.
|
|
emptyFile() // 6.
|
|
|
|
verifyFiles := func(remainingFiles []int) {
|
|
t.Helper()
|
|
|
|
files, err := os.ReadDir(hrw.dir.Name())
|
|
require.NoError(t, err)
|
|
require.Equal(t, len(remainingFiles), len(files), "files on disk")
|
|
require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles")
|
|
require.Equal(t, len(remainingFiles), len(hrw.closers), "closers")
|
|
|
|
for _, i := range remainingFiles {
|
|
_, ok := hrw.mmappedChunkFiles[i]
|
|
require.True(t, ok, "remaining file %d not in hrw.mmappedChunkFiles", i)
|
|
}
|
|
}
|
|
|
|
verifyFiles([]int{1, 2, 3, 4, 5, 6})
|
|
|
|
// Truncating files till 2. It should not delete anything after 3 (inclusive)
|
|
// though files 4 and 6 are empty.
|
|
file2Maxt := hrw.mmappedChunkFiles[2].maxt
|
|
require.NoError(t, hrw.Truncate(file2Maxt+1))
|
|
verifyFiles([]int{3, 4, 5, 6})
|
|
|
|
// Add chunk, so file 6 is not empty anymore.
|
|
addChunk()
|
|
verifyFiles([]int{3, 4, 5, 6})
|
|
|
|
// Truncating till file 3 should also delete file 4, because it is empty.
|
|
file3Maxt := hrw.mmappedChunkFiles[3].maxt
|
|
require.NoError(t, hrw.Truncate(file3Maxt+1))
|
|
addChunk()
|
|
verifyFiles([]int{5, 6, 7})
|
|
|
|
dir := hrw.dir.Name()
|
|
require.NoError(t, hrw.Close())
|
|
// Restarting checks for unsequential files.
|
|
hrw = createChunkDiskMapper(t, dir)
|
|
verifyFiles([]int{5, 6, 7})
|
|
}
|
|
|
|
// TestHeadReadWriter_TruncateAfterIterateChunksError tests for
|
|
// https://github.com/prometheus/prometheus/issues/7753
|
|
func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
|
|
hrw := createChunkDiskMapper(t, "")
|
|
defer func() {
|
|
require.NoError(t, hrw.Close())
|
|
}()
|
|
|
|
// Write a chunks to iterate on it later.
|
|
var err error
|
|
awaitCb := make(chan struct{})
|
|
hrw.WriteChunk(1, 0, 1000, randomChunk(t), func(cbErr error) {
|
|
err = cbErr
|
|
close(awaitCb)
|
|
})
|
|
<-awaitCb
|
|
require.NoError(t, err)
|
|
|
|
dir := hrw.dir.Name()
|
|
require.NoError(t, hrw.Close())
|
|
|
|
// Restarting to recreate https://github.com/prometheus/prometheus/issues/7753.
|
|
hrw = createChunkDiskMapper(t, dir)
|
|
|
|
// Forcefully failing IterateAllChunks.
|
|
require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error {
|
|
return errors.New("random error")
|
|
}))
|
|
|
|
// Truncation call should not return error after IterateAllChunks fails.
|
|
require.NoError(t, hrw.Truncate(2000))
|
|
}
|
|
|
|
func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
|
|
hrw := createChunkDiskMapper(t, "")
|
|
defer func() {
|
|
require.NoError(t, hrw.Close())
|
|
}()
|
|
|
|
timeRange := 0
|
|
addChunk := func() {
|
|
t.Helper()
|
|
|
|
step := 100
|
|
mint, maxt := timeRange+1, timeRange+step-1
|
|
var err error
|
|
awaitCb := make(chan struct{})
|
|
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(cbErr error) {
|
|
err = cbErr
|
|
close(awaitCb)
|
|
})
|
|
<-awaitCb
|
|
require.NoError(t, err)
|
|
timeRange += step
|
|
}
|
|
nonEmptyFile := func() {
|
|
t.Helper()
|
|
|
|
hrw.CutNewFile()
|
|
addChunk()
|
|
}
|
|
|
|
addChunk() // 1. Created with the first chunk.
|
|
nonEmptyFile() // 2.
|
|
nonEmptyFile() // 3.
|
|
|
|
require.Equal(t, 3, len(hrw.mmappedChunkFiles))
|
|
lastFile := 0
|
|
for idx := range hrw.mmappedChunkFiles {
|
|
if idx > lastFile {
|
|
lastFile = idx
|
|
}
|
|
}
|
|
require.Equal(t, 3, lastFile)
|
|
dir := hrw.dir.Name()
|
|
require.NoError(t, hrw.Close())
|
|
|
|
// Write an empty last file mimicking an abrupt shutdown on file creation.
|
|
emptyFileName := segmentFile(dir, lastFile+1)
|
|
f, err := os.OpenFile(emptyFileName, os.O_WRONLY|os.O_CREATE, 0o666)
|
|
require.NoError(t, err)
|
|
require.NoError(t, f.Sync())
|
|
stat, err := f.Stat()
|
|
require.NoError(t, err)
|
|
require.Equal(t, int64(0), stat.Size())
|
|
require.NoError(t, f.Close())
|
|
|
|
// Open chunk disk mapper again, corrupt file should be removed.
|
|
hrw = createChunkDiskMapper(t, dir)
|
|
|
|
// Removed from memory.
|
|
require.Equal(t, 3, len(hrw.mmappedChunkFiles))
|
|
for idx := range hrw.mmappedChunkFiles {
|
|
require.LessOrEqual(t, idx, lastFile, "file index is bigger than previous last file")
|
|
}
|
|
|
|
// Removed even from disk.
|
|
files, err := os.ReadDir(dir)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 3, len(files))
|
|
for _, fi := range files {
|
|
seq, err := strconv.ParseUint(fi.Name(), 10, 64)
|
|
require.NoError(t, err)
|
|
require.LessOrEqual(t, seq, uint64(lastFile), "file index on disk is bigger than previous last file")
|
|
}
|
|
}
|
|
|
|
func createChunkDiskMapper(t *testing.T, dir string) *ChunkDiskMapper {
|
|
if dir == "" {
|
|
dir = t.TempDir()
|
|
}
|
|
|
|
hrw, err := NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), DefaultWriteBufferSize, writeQueueSize)
|
|
require.NoError(t, err)
|
|
require.False(t, hrw.fileMaxtSet)
|
|
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil }))
|
|
require.True(t, hrw.fileMaxtSet)
|
|
|
|
return hrw
|
|
}
|
|
|
|
func randomChunk(t *testing.T) chunkenc.Chunk {
|
|
chunk := chunkenc.NewXORChunk()
|
|
len := rand.Int() % 120
|
|
app, err := chunk.Appender()
|
|
require.NoError(t, err)
|
|
for i := 0; i < len; i++ {
|
|
app.Append(rand.Int63(), rand.Float64())
|
|
}
|
|
return chunk
|
|
}
|
|
|
|
func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) {
|
|
var err error
|
|
seriesRef = HeadSeriesRef(rand.Int63())
|
|
mint = int64((idx)*1000 + 1)
|
|
maxt = int64((idx + 1) * 1000)
|
|
chunk = randomChunk(t)
|
|
awaitCb := make(chan struct{})
|
|
chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) {
|
|
require.NoError(t, err)
|
|
close(awaitCb)
|
|
})
|
|
<-awaitCb
|
|
return
|
|
}
|