@ -17,6 +17,7 @@ import (
"math/rand"
"os"
"path"
"runtime"
"sync"
"testing"
"time"
@ -700,11 +701,46 @@ func TestRun_StartupTime(t *testing.T) {
}
}
func generateWALRecords ( w * WL , segment , seriesCount , samplesCount int ) error {
enc := record . Encoder { }
for j := 0 ; j < seriesCount ; j ++ {
ref := j + ( segment * 100 )
series := enc . Series ( [ ] record . RefSeries {
{
Ref : chunks . HeadSeriesRef ( ref ) ,
Labels : labels . FromStrings ( "__name__" , fmt . Sprintf ( "metric_%d" , segment ) ) ,
} ,
} , nil )
if err := w . Log ( series ) ; err != nil {
return err
}
for k := 0 ; k < samplesCount ; k ++ {
inner := rand . Intn ( ref + 1 )
sample := enc . Samples ( [ ] record . RefSample {
{
Ref : chunks . HeadSeriesRef ( inner ) ,
T : int64 ( segment ) ,
V : float64 ( segment ) ,
} ,
} , nil )
if err := w . Log ( sample ) ; err != nil {
return err
}
}
}
return nil
}
func TestRun_AvoidNotifyWhenBehind ( t * testing . T ) {
const pageSize = 32 * 1024
const segments = 10
const seriesCount = 20
const samplesCount = 300
if runtime . GOOS == "windows" { // Takes a really long time, perhaps because min sleep time is 15ms.
t . SkipNow ( )
}
const segmentSize = pageSize // Smallest allowed segment size.
const segmentsToWrite = 5
const segmentsToRead = segmentsToWrite - 1
const seriesCount = 10
const samplesCount = 50
// This test can take longer than intended to finish in cloud CI.
readTimeout := 10 * time . Second
@ -717,73 +753,37 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
err := os . Mkdir ( wdir , 0 o777 )
require . NoError ( t , err )
enc := record . Encoder { }
w , err := NewSize ( nil , nil , wdir , pageSize , compress )
w , err := NewSize ( nil , nil , wdir , segmentSize , compress )
require . NoError ( t , err )
var wg sync . WaitGroup
// add one segment initially to ensure there's a value > 0 for the last segment id
for i := 0 ; i < 1 ; i ++ {
for j := 0 ; j < seriesCount ; j ++ {
ref := j + ( i * 100 )
series := enc . Series ( [ ] record . RefSeries {
{
Ref : chunks . HeadSeriesRef ( ref ) ,
Labels : labels . FromStrings ( "__name__" , fmt . Sprintf ( "metric_%d" , i ) ) ,
} ,
} , nil )
require . NoError ( t , w . Log ( series ) )
for k := 0 ; k < samplesCount ; k ++ {
inner := rand . Intn ( ref + 1 )
sample := enc . Samples ( [ ] record . RefSample {
{
Ref : chunks . HeadSeriesRef ( inner ) ,
T : int64 ( i ) ,
V : float64 ( i ) ,
} ,
} , nil )
require . NoError ( t , w . Log ( sample ) )
}
}
}
// Generate one segment initially to ensure that watcher.Run() finds at least one segment on disk.
require . NoError ( t , generateWALRecords ( w , 0 , seriesCount , samplesCount ) )
w . NextSegment ( ) // Force creation of the next segment
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
for i := 1 ; i < segments ; i ++ {
for j := 0 ; j < seriesCount ; j ++ {
ref := j + ( i * 100 )
series := enc . Series ( [ ] record . RefSeries {
{
Ref : chunks . HeadSeriesRef ( ref ) ,
Labels : labels . FromStrings ( "__name__" , fmt . Sprintf ( "metric_%d" , i ) ) ,
} ,
} , nil )
require . NoError ( t , w . Log ( series ) )
for k := 0 ; k < samplesCount ; k ++ {
inner := rand . Intn ( ref + 1 )
sample := enc . Samples ( [ ] record . RefSample {
{
Ref : chunks . HeadSeriesRef ( inner ) ,
T : int64 ( i ) ,
V : float64 ( i ) ,
} ,
} , nil )
require . NoError ( t , w . Log ( sample ) )
}
}
for i := 1 ; i < segmentsToWrite ; i ++ {
require . NoError ( t , generateWALRecords ( w , i , seriesCount , samplesCount ) )
w . NextSegment ( )
}
} ( )
wt := newWriteToMock ( time . Millisecond )
watcher := NewWatcher ( wMetrics , nil , nil , "" , wt , dir , false , false , false )
watcher . MaxSegment = segments
watcher . MaxSegment = segmentsToRead
watcher . setMetrics ( )
startTime := time . Now ( )
err = watcher . Run ( )
wg . Wait ( )
require . Less ( t , time . Since ( startTime ) , readTimeout )
// But samples records shouldn't get dropped
retry ( t , defaultRetryInterval , defaultRetries , func ( ) bool {
return wt . checkNumSeries ( ) > 0
} )
require . Equal ( t , segmentsToRead * seriesCount * samplesCount , wt . samplesAppended )
require . NoError ( t , err )
require . NoError ( t , w . Close ( ) )
} )