2018-05-16 12:02:55 +00:00
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2022-10-10 15:08:46 +00:00
package wlog
2018-05-16 12:02:55 +00:00
import (
"bytes"
2023-08-14 21:42:02 +00:00
"crypto/rand"
2019-01-16 18:09:08 +00:00
"fmt"
2020-10-29 10:37:03 +00:00
"io"
2018-05-16 12:02:55 +00:00
"os"
2019-02-18 19:05:07 +00:00
"path/filepath"
2018-05-16 12:02:55 +00:00
"testing"
2020-10-22 09:00:08 +00:00
client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
2020-10-29 09:43:23 +00:00
"github.com/stretchr/testify/require"
2020-07-21 08:08:06 +00:00
"go.uber.org/goleak"
2019-11-12 02:40:16 +00:00
2020-10-22 09:00:08 +00:00
"github.com/prometheus/prometheus/tsdb/fileutil"
2019-08-14 09:07:02 +00:00
"github.com/prometheus/prometheus/util/testutil"
2018-05-16 12:02:55 +00:00
)
2020-07-21 08:08:06 +00:00
func TestMain ( m * testing . M ) {
goleak . VerifyTestMain ( m )
}
2019-06-12 14:10:37 +00:00
// TestWALRepair_ReadingError ensures that a repair is run for an error
// when reading a record.
func TestWALRepair_ReadingError ( t * testing . T ) {
2018-11-28 13:15:11 +00:00
for name , test := range map [ string ] struct {
corrSgm int // Which segment to corrupt.
corrFunc func ( f * os . File ) // Func that applies the corruption.
intactRecs int // Total expected records left after the repair.
} {
"torn_last_record" : {
2 ,
func ( f * os . File ) {
_ , err := f . Seek ( pageSize * 2 , 0 )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-11-28 13:15:11 +00:00
_ , err = f . Write ( [ ] byte { byte ( recFirst ) } )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-11-28 13:15:11 +00:00
} ,
8 ,
} ,
// Ensures that the page buffer is big enough to fit
2019-09-30 15:54:55 +00:00
// an entire page size without panicking.
2020-01-21 19:30:20 +00:00
// https://github.com/prometheus/tsdb/pull/414
2018-11-28 13:15:11 +00:00
"bad_header" : {
1 ,
func ( f * os . File ) {
_ , err := f . Seek ( pageSize , 0 )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-11-28 13:15:11 +00:00
_ , err = f . Write ( [ ] byte { byte ( recPageTerm ) } )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-11-28 13:15:11 +00:00
} ,
4 ,
2018-11-14 16:43:33 +00:00
} ,
2018-11-28 13:15:11 +00:00
"bad_fragment_sequence" : {
1 ,
func ( f * os . File ) {
_ , err := f . Seek ( pageSize , 0 )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-11-28 13:15:11 +00:00
_ , err = f . Write ( [ ] byte { byte ( recLast ) } )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-11-28 13:15:11 +00:00
} ,
4 ,
2018-05-17 13:00:32 +00:00
} ,
2018-11-28 13:15:11 +00:00
"bad_fragment_flag" : {
1 ,
func ( f * os . File ) {
_ , err := f . Seek ( pageSize , 0 )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-11-28 13:15:11 +00:00
_ , err = f . Write ( [ ] byte { 123 } )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-11-28 13:15:11 +00:00
} ,
4 ,
2018-05-17 13:00:32 +00:00
} ,
2018-11-28 13:15:11 +00:00
"bad_checksum" : {
1 ,
func ( f * os . File ) {
_ , err := f . Seek ( pageSize + 4 , 0 )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-11-28 13:15:11 +00:00
_ , err = f . Write ( [ ] byte { 0 } )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-11-28 13:15:11 +00:00
} ,
4 ,
2018-05-17 13:00:32 +00:00
} ,
2018-11-28 13:15:11 +00:00
"bad_length" : {
1 ,
func ( f * os . File ) {
_ , err := f . Seek ( pageSize + 2 , 0 )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-11-28 13:15:11 +00:00
_ , err = f . Write ( [ ] byte { 0 } )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-11-28 13:15:11 +00:00
} ,
4 ,
2018-05-17 13:00:32 +00:00
} ,
2018-11-28 13:15:11 +00:00
"bad_content" : {
1 ,
func ( f * os . File ) {
_ , err := f . Seek ( pageSize + 100 , 0 )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-11-28 13:15:11 +00:00
_ , err = f . Write ( [ ] byte ( "beef" ) )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-11-28 13:15:11 +00:00
} ,
4 ,
2018-05-17 13:00:32 +00:00
} ,
} {
t . Run ( name , func ( t * testing . T ) {
2021-11-01 06:58:18 +00:00
dir := t . TempDir ( )
2018-05-17 13:00:32 +00:00
2018-11-28 13:15:11 +00:00
// We create 3 segments with 3 records each and
// then corrupt a given record in a given segment.
// As a result we want a repaired WAL with given intact records.
2019-02-25 10:10:27 +00:00
segSize := 3 * pageSize
2023-07-11 12:57:57 +00:00
w , err := NewSize ( nil , nil , dir , segSize , CompressionNone )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-05-17 13:00:32 +00:00
var records [ ] [ ] byte
for i := 1 ; i <= 9 ; i ++ {
b := make ( [ ] byte , pageSize - recordHeaderSize )
b [ 0 ] = byte ( i )
records = append ( records , b )
2020-10-29 09:43:23 +00:00
require . NoError ( t , w . Log ( b ) )
2018-05-17 13:00:32 +00:00
}
2020-09-01 09:16:57 +00:00
first , last , err := Segments ( w . Dir ( ) )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2022-10-10 15:08:46 +00:00
require . Equal ( t , 3 , 1 + last - first , "wlog creation didn't result in expected number of segments" )
2019-06-12 14:10:37 +00:00
2020-10-29 09:43:23 +00:00
require . NoError ( t , w . Close ( ) )
2018-05-17 13:00:32 +00:00
2021-10-22 08:06:44 +00:00
f , err := os . OpenFile ( SegmentName ( dir , test . corrSgm ) , os . O_RDWR , 0 o666 )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-05-17 13:00:32 +00:00
// Apply corruption function.
2018-11-28 13:15:11 +00:00
test . corrFunc ( f )
2018-05-17 13:00:32 +00:00
2020-10-29 09:43:23 +00:00
require . NoError ( t , f . Close ( ) )
2018-05-17 13:00:32 +00:00
2023-07-11 12:57:57 +00:00
w , err = NewSize ( nil , nil , dir , segSize , CompressionNone )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-03-19 13:31:57 +00:00
defer w . Close ( )
2018-05-17 13:00:32 +00:00
2020-09-01 09:16:57 +00:00
first , last , err = Segments ( w . Dir ( ) )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2018-05-17 13:00:32 +00:00
2019-05-24 18:33:28 +00:00
// Backfill segments from the most recent checkpoint onwards.
for i := first ; i <= last ; i ++ {
s , err := OpenReadSegment ( SegmentName ( w . Dir ( ) , i ) )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-05-24 18:33:28 +00:00
sr := NewSegmentBufReader ( s )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-05-24 18:33:28 +00:00
r := NewReader ( sr )
2023-10-31 11:35:13 +00:00
for r . Next ( ) {
2019-05-24 18:33:28 +00:00
}
2021-10-22 08:06:44 +00:00
// Close the segment so we don't break things on Windows.
2019-05-24 18:33:28 +00:00
s . Close ( )
// No corruption in this segment.
if r . Err ( ) == nil {
continue
}
2020-10-29 09:43:23 +00:00
require . NoError ( t , w . Repair ( r . Err ( ) ) )
2019-05-24 18:33:28 +00:00
break
2018-05-17 13:00:32 +00:00
}
2019-05-24 18:33:28 +00:00
sr , err := NewSegmentsReader ( dir )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-03-19 13:31:57 +00:00
defer sr . Close ( )
2019-05-24 18:33:28 +00:00
r := NewReader ( sr )
2018-05-17 13:00:32 +00:00
var result [ ] [ ] byte
for r . Next ( ) {
var b [ ] byte
result = append ( result , append ( b , r . Record ( ) ... ) )
}
2020-10-29 09:43:23 +00:00
require . NoError ( t , r . Err ( ) )
2023-12-07 11:35:01 +00:00
require . Len ( t , result , test . intactRecs , "Wrong number of intact records" )
2018-05-17 13:00:32 +00:00
for i , r := range result {
if ! bytes . Equal ( records [ i ] , r ) {
t . Fatalf ( "record %d diverges: want %x, got %x" , i , records [ i ] [ : 10 ] , r [ : 10 ] )
}
}
2019-02-25 10:10:27 +00:00
2019-05-24 18:33:28 +00:00
// Make sure there is a new 0 size Segment after the corrupted Segment.
2020-09-01 09:16:57 +00:00
_ , last , err = Segments ( w . Dir ( ) )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
require . Equal ( t , test . corrSgm + 1 , last )
2019-05-24 18:33:28 +00:00
fi , err := os . Stat ( SegmentName ( dir , last ) )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
require . Equal ( t , int64 ( 0 ) , fi . Size ( ) )
2018-05-17 13:00:32 +00:00
} )
}
}
2019-02-18 19:05:07 +00:00
// TestCorruptAndCarryOn writes a multi-segment WAL; corrupts the first segment and
// ensures that an error during reading that segment are correctly repaired before
// moving to write more records to the WAL.
func TestCorruptAndCarryOn ( t * testing . T ) {
2021-11-01 06:58:18 +00:00
dir := t . TempDir ( )
2019-02-18 19:05:07 +00:00
var (
logger = testutil . NewLogger ( t )
segmentSize = pageSize * 3
recordSize = ( pageSize / 3 ) - recordHeaderSize
)
// Produce a WAL with a two segments of 3 pages with 3 records each,
// so when we truncate the file we're guaranteed to split a record.
{
2023-07-11 12:57:57 +00:00
w , err := NewSize ( logger , nil , dir , segmentSize , CompressionNone )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
for i := 0 ; i < 18 ; i ++ {
buf := make ( [ ] byte , recordSize )
_ , err := rand . Read ( buf )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
err = w . Log ( buf )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
}
err = w . Close ( )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
}
// Check all the segments are the correct size.
{
segments , err := listSegments ( dir )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
for _ , segment := range segments {
2021-10-22 08:06:44 +00:00
f , err := os . OpenFile ( filepath . Join ( dir , fmt . Sprintf ( "%08d" , segment . index ) ) , os . O_RDONLY , 0 o666 )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
fi , err := f . Stat ( )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
t . Log ( "segment" , segment . index , "size" , fi . Size ( ) )
2020-10-29 09:43:23 +00:00
require . Equal ( t , int64 ( segmentSize ) , fi . Size ( ) )
2019-02-18 19:05:07 +00:00
err = f . Close ( )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
}
}
// Truncate the first file, splitting the middle record in the second
// page in half, leaving 4 valid records.
{
2021-10-22 08:06:44 +00:00
f , err := os . OpenFile ( filepath . Join ( dir , fmt . Sprintf ( "%08d" , 0 ) ) , os . O_RDWR , 0 o666 )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
fi , err := f . Stat ( )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
require . Equal ( t , int64 ( segmentSize ) , fi . Size ( ) )
2019-02-18 19:05:07 +00:00
err = f . Truncate ( int64 ( segmentSize / 2 ) )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
err = f . Close ( )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
}
// Now try and repair this WAL, and write 5 more records to it.
{
sr , err := NewSegmentsReader ( dir )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
reader := NewReader ( sr )
i := 0
for ; i < 4 && reader . Next ( ) ; i ++ {
2023-12-07 11:35:01 +00:00
require . Len ( t , reader . Record ( ) , recordSize )
2019-02-18 19:05:07 +00:00
}
2020-10-29 09:43:23 +00:00
require . Equal ( t , 4 , i , "not enough records" )
require . False ( t , reader . Next ( ) , "unexpected record" )
2019-02-18 19:05:07 +00:00
corruptionErr := reader . Err ( )
2020-10-29 09:43:23 +00:00
require . Error ( t , corruptionErr )
2019-02-18 19:05:07 +00:00
err = sr . Close ( )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
2023-07-11 12:57:57 +00:00
w , err := NewSize ( logger , nil , dir , segmentSize , CompressionNone )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
err = w . Repair ( corruptionErr )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
2020-01-02 14:54:09 +00:00
// Ensure that we have a completely clean slate after repairing.
2023-12-07 11:35:01 +00:00
require . Equal ( t , 1 , w . segment . Index ( ) ) // We corrupted segment 0.
require . Equal ( t , 0 , w . donePages )
2019-05-24 18:33:28 +00:00
2019-02-18 19:05:07 +00:00
for i := 0 ; i < 5 ; i ++ {
buf := make ( [ ] byte , recordSize )
_ , err := rand . Read ( buf )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
err = w . Log ( buf )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
}
err = w . Close ( )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
}
// Replay the WAL. Should get 9 records.
{
sr , err := NewSegmentsReader ( dir )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-02-18 19:05:07 +00:00
reader := NewReader ( sr )
i := 0
for ; i < 9 && reader . Next ( ) ; i ++ {
2023-12-07 11:35:01 +00:00
require . Len ( t , reader . Record ( ) , recordSize )
2019-02-18 19:05:07 +00:00
}
2020-10-29 09:43:23 +00:00
require . Equal ( t , 9 , i , "wrong number of records" )
require . False ( t , reader . Next ( ) , "unexpected record" )
2023-12-07 11:35:01 +00:00
require . NoError ( t , reader . Err ( ) )
2019-03-19 13:31:57 +00:00
sr . Close ( )
2019-02-18 19:05:07 +00:00
}
}
2019-04-30 07:17:07 +00:00
// TestClose ensures that calling Close more than once doesn't panic and doesn't block.
func TestClose ( t * testing . T ) {
2021-11-01 06:58:18 +00:00
dir := t . TempDir ( )
2023-07-11 12:57:57 +00:00
w , err := NewSize ( nil , nil , dir , pageSize , CompressionNone )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
require . NoError ( t , w . Close ( ) )
require . Error ( t , w . Close ( ) )
2019-04-30 07:17:07 +00:00
}
2019-05-17 08:47:42 +00:00
func TestSegmentMetric ( t * testing . T ) {
var (
segmentSize = pageSize
recordSize = ( pageSize / 2 ) - recordHeaderSize
)
2021-11-01 06:58:18 +00:00
dir := t . TempDir ( )
2023-07-11 12:57:57 +00:00
w , err := NewSize ( nil , nil , dir , segmentSize , CompressionNone )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-05-17 08:47:42 +00:00
2019-09-19 11:24:34 +00:00
initialSegment := client_testutil . ToFloat64 ( w . metrics . currentSegment )
2019-05-17 08:47:42 +00:00
// Write 3 records, each of which is half the segment size, meaning we should rotate to the next segment.
for i := 0 ; i < 3 ; i ++ {
buf := make ( [ ] byte , recordSize )
_ , err := rand . Read ( buf )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-05-17 08:47:42 +00:00
err = w . Log ( buf )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-05-17 08:47:42 +00:00
}
2020-10-29 09:43:23 +00:00
require . Equal ( t , initialSegment + 1 , client_testutil . ToFloat64 ( w . metrics . currentSegment ) , "segment metric did not increment after segment rotation" )
require . NoError ( t , w . Close ( ) )
2019-05-17 08:47:42 +00:00
}
2019-06-19 13:46:24 +00:00
func TestCompression ( t * testing . T ) {
2023-07-11 12:57:57 +00:00
bootstrap := func ( compressed CompressionType ) string {
2019-06-19 13:46:24 +00:00
const (
segmentSize = pageSize
recordSize = ( pageSize / 2 ) - recordHeaderSize
records = 100
)
2021-11-01 06:58:18 +00:00
dirPath := t . TempDir ( )
2019-06-19 13:46:24 +00:00
w , err := NewSize ( nil , nil , dirPath , segmentSize , compressed )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2019-06-19 13:46:24 +00:00
buf := make ( [ ] byte , recordSize )
for i := 0 ; i < records ; i ++ {
2020-10-29 09:43:23 +00:00
require . NoError ( t , w . Log ( buf ) )
2019-06-19 13:46:24 +00:00
}
2020-10-29 09:43:23 +00:00
require . NoError ( t , w . Close ( ) )
2019-06-19 13:46:24 +00:00
return dirPath
}
2023-07-11 12:57:57 +00:00
tmpDirs := make ( [ ] string , 0 , 3 )
2019-03-19 13:31:57 +00:00
defer func ( ) {
2023-07-11 12:57:57 +00:00
for _ , dir := range tmpDirs {
require . NoError ( t , os . RemoveAll ( dir ) )
}
2019-03-19 13:31:57 +00:00
} ( )
2018-05-16 12:02:55 +00:00
2023-07-11 12:57:57 +00:00
dirUnCompressed := bootstrap ( CompressionNone )
tmpDirs = append ( tmpDirs , dirUnCompressed )
2018-05-16 12:02:55 +00:00
2023-07-11 12:57:57 +00:00
for _ , compressionType := range [ ] CompressionType { CompressionSnappy , CompressionZstd } {
dirCompressed := bootstrap ( compressionType )
tmpDirs = append ( tmpDirs , dirCompressed )
uncompressedSize , err := fileutil . DirSize ( dirUnCompressed )
require . NoError ( t , err )
compressedSize , err := fileutil . DirSize ( dirCompressed )
require . NoError ( t , err )
require . Greater ( t , float64 ( uncompressedSize ) * 0.75 , float64 ( compressedSize ) , "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d" , uncompressedSize , compressedSize )
}
2019-06-19 13:46:24 +00:00
}
2018-05-16 12:02:55 +00:00
2020-10-29 10:37:03 +00:00
func TestLogPartialWrite ( t * testing . T ) {
const segmentSize = pageSize * 2
record := [ ] byte { 1 , 2 , 3 , 4 , 5 }
tests := map [ string ] struct {
numRecords int
faultyRecord int
} {
"partial write when logging first record in a page" : {
numRecords : 10 ,
faultyRecord : 1 ,
} ,
"partial write when logging record in the middle of a page" : {
numRecords : 10 ,
faultyRecord : 3 ,
} ,
"partial write when logging last record of a page" : {
numRecords : ( pageSize / ( recordHeaderSize + len ( record ) ) ) + 10 ,
faultyRecord : pageSize / ( recordHeaderSize + len ( record ) ) ,
} ,
// TODO the current implementation suffers this:
2023-04-09 07:08:40 +00:00
// "partial write when logging a record overlapping two pages": {
2020-10-29 10:37:03 +00:00
// numRecords: (pageSize / (recordHeaderSize + len(record))) + 10,
// faultyRecord: pageSize/(recordHeaderSize+len(record)) + 1,
2023-04-09 07:08:40 +00:00
// },
2020-10-29 10:37:03 +00:00
}
for testName , testData := range tests {
t . Run ( testName , func ( t * testing . T ) {
2021-11-01 06:58:18 +00:00
dirPath := t . TempDir ( )
2020-10-29 10:37:03 +00:00
2023-07-11 12:57:57 +00:00
w , err := NewSize ( nil , nil , dirPath , segmentSize , CompressionNone )
2020-10-29 10:37:03 +00:00
require . NoError ( t , err )
// Replace the underlying segment file with a mocked one that injects a failure.
w . segment . SegmentFile = & faultySegmentFile {
SegmentFile : w . segment . SegmentFile ,
writeFailureAfter : ( ( recordHeaderSize + len ( record ) ) * ( testData . faultyRecord - 1 ) ) + 2 ,
writeFailureErr : io . ErrShortWrite ,
}
for i := 1 ; i <= testData . numRecords ; i ++ {
if err := w . Log ( record ) ; i == testData . faultyRecord {
2023-12-07 11:35:01 +00:00
require . ErrorIs ( t , io . ErrShortWrite , err )
2020-10-29 10:37:03 +00:00
} else {
require . NoError ( t , err )
}
}
require . NoError ( t , w . Close ( ) )
// Read it back. We expect no corruption.
s , err := OpenReadSegment ( SegmentName ( dirPath , 0 ) )
require . NoError ( t , err )
2021-01-15 08:19:57 +00:00
defer func ( ) { require . NoError ( t , s . Close ( ) ) } ( )
2020-10-29 10:37:03 +00:00
r := NewReader ( NewSegmentBufReader ( s ) )
for i := 0 ; i < testData . numRecords ; i ++ {
require . True ( t , r . Next ( ) )
require . NoError ( t , r . Err ( ) )
require . Equal ( t , record , r . Record ( ) )
}
require . False ( t , r . Next ( ) )
require . NoError ( t , r . Err ( ) )
} )
}
}
type faultySegmentFile struct {
SegmentFile
written int
writeFailureAfter int
writeFailureErr error
}
func ( f * faultySegmentFile ) Write ( p [ ] byte ) ( int , error ) {
if f . writeFailureAfter >= 0 && f . writeFailureAfter < f . written + len ( p ) {
partialLen := f . writeFailureAfter - f . written
if partialLen <= 0 || partialLen >= len ( p ) {
partialLen = 1
}
// Inject failure.
n , _ := f . SegmentFile . Write ( p [ : partialLen ] )
f . written += n
f . writeFailureAfter = - 1
return n , f . writeFailureErr
}
// Proxy the write to the underlying file.
n , err := f . SegmentFile . Write ( p )
f . written += n
return n , err
}
2019-06-19 13:46:24 +00:00
func BenchmarkWAL_LogBatched ( b * testing . B ) {
2023-07-11 12:57:57 +00:00
for _ , compress := range [ ] CompressionType { CompressionNone , CompressionSnappy , CompressionZstd } {
b . Run ( fmt . Sprintf ( "compress=%s" , compress ) , func ( b * testing . B ) {
2021-11-01 06:58:18 +00:00
dir := b . TempDir ( )
2019-06-19 13:46:24 +00:00
w , err := New ( nil , nil , dir , compress )
2020-10-29 09:43:23 +00:00
require . NoError ( b , err )
2019-06-19 13:46:24 +00:00
defer w . Close ( )
var buf [ 2048 ] byte
var recs [ ] [ ] byte
b . SetBytes ( 2048 )
for i := 0 ; i < b . N ; i ++ {
recs = append ( recs , buf [ : ] )
if len ( recs ) < 1000 {
continue
}
err := w . Log ( recs ... )
2020-10-29 09:43:23 +00:00
require . NoError ( b , err )
2019-06-19 13:46:24 +00:00
recs = recs [ : 0 ]
}
// Stop timer to not count fsync time on close.
// If it's counted batched vs. single benchmarks are very similar but
// do not show burst throughput well.
b . StopTimer ( )
} )
2018-05-16 12:02:55 +00:00
}
}
func BenchmarkWAL_Log ( b * testing . B ) {
2023-07-11 12:57:57 +00:00
for _ , compress := range [ ] CompressionType { CompressionNone , CompressionSnappy , CompressionZstd } {
b . Run ( fmt . Sprintf ( "compress=%s" , compress ) , func ( b * testing . B ) {
2021-11-01 06:58:18 +00:00
dir := b . TempDir ( )
2018-05-16 12:02:55 +00:00
2019-06-19 13:46:24 +00:00
w , err := New ( nil , nil , dir , compress )
2020-10-29 09:43:23 +00:00
require . NoError ( b , err )
2019-06-19 13:46:24 +00:00
defer w . Close ( )
2018-05-16 12:02:55 +00:00
2019-06-19 13:46:24 +00:00
var buf [ 2048 ] byte
b . SetBytes ( 2048 )
2018-05-16 12:02:55 +00:00
2019-06-19 13:46:24 +00:00
for i := 0 ; i < b . N ; i ++ {
err := w . Log ( buf [ : ] )
2020-10-29 09:43:23 +00:00
require . NoError ( b , err )
2019-06-19 13:46:24 +00:00
}
// Stop timer to not count fsync time on close.
// If it's counted batched vs. single benchmarks are very similar but
// do not show burst throughput well.
b . StopTimer ( )
} )
2018-05-16 12:02:55 +00:00
}
}