2017-05-10 09:44:13 +00:00
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
2018-05-29 08:51:29 +00:00
"context"
2017-05-10 09:44:13 +00:00
"fmt"
2019-02-20 04:03:41 +00:00
"math"
2024-06-21 21:19:58 +00:00
"math/rand"
2019-02-14 01:11:17 +00:00
"os"
2022-03-03 22:53:36 +00:00
"runtime/pprof"
2019-02-20 07:51:08 +00:00
"sort"
"strconv"
2020-04-30 22:54:02 +00:00
"strings"
2017-05-10 09:44:13 +00:00
"sync"
"testing"
"time"
2021-06-11 16:17:59 +00:00
"github.com/go-kit/log"
2018-09-07 21:26:04 +00:00
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
2024-06-21 21:19:58 +00:00
"github.com/google/go-cmp/cmp"
2020-04-30 22:54:02 +00:00
"github.com/prometheus/client_golang/prometheus"
2019-04-24 09:46:31 +00:00
client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
2017-05-10 09:44:13 +00:00
"github.com/prometheus/common/model"
2020-10-29 09:43:23 +00:00
"github.com/stretchr/testify/require"
2020-10-22 09:00:08 +00:00
"go.uber.org/atomic"
2017-10-23 13:57:44 +00:00
"github.com/prometheus/prometheus/config"
2022-07-14 13:13:12 +00:00
"github.com/prometheus/prometheus/model/histogram"
2021-11-08 14:23:17 +00:00
"github.com/prometheus/prometheus/model/labels"
2024-01-29 12:52:03 +00:00
"github.com/prometheus/prometheus/model/relabel"
2021-11-08 14:23:17 +00:00
"github.com/prometheus/prometheus/model/timestamp"
2017-10-23 20:28:17 +00:00
"github.com/prometheus/prometheus/prompb"
2020-11-19 15:23:03 +00:00
"github.com/prometheus/prometheus/scrape"
2021-11-06 10:10:04 +00:00
"github.com/prometheus/prometheus/tsdb/chunks"
2019-09-19 09:15:41 +00:00
"github.com/prometheus/prometheus/tsdb/record"
2024-01-24 16:48:22 +00:00
"github.com/prometheus/prometheus/util/testutil"
2017-05-10 09:44:13 +00:00
)
2018-05-23 14:03:54 +00:00
const defaultFlushDeadline = 1 * time . Minute
2020-10-15 21:53:59 +00:00
func newHighestTimestampMetric ( ) * maxTimestamp {
return & maxTimestamp {
2020-09-24 18:44:18 +00:00
Gauge : prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "highest_timestamp_in_seconds" ,
Help : "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch." ,
} ) ,
}
}
2017-05-10 09:44:13 +00:00
func TestSampleDelivery ( t * testing . T ) {
2021-05-06 20:53:52 +00:00
testcases := [ ] struct {
2023-01-13 11:09:20 +00:00
name string
samples bool
exemplars bool
histograms bool
floatHistograms bool
2021-05-06 20:53:52 +00:00
} {
2023-01-13 11:09:20 +00:00
{ samples : true , exemplars : false , histograms : false , floatHistograms : false , name : "samples only" } ,
{ samples : true , exemplars : true , histograms : true , floatHistograms : true , name : "samples, exemplars, and histograms" } ,
{ samples : false , exemplars : true , histograms : false , floatHistograms : false , name : "exemplars only" } ,
{ samples : false , exemplars : false , histograms : true , floatHistograms : false , name : "histograms only" } ,
{ samples : false , exemplars : false , histograms : false , floatHistograms : true , name : "float histograms only" } ,
2021-05-06 20:53:52 +00:00
}
2017-05-10 09:44:13 +00:00
2021-05-06 20:53:52 +00:00
// Let's create an even number of send batches so we don't run into the
// batch timeout case.
n := 3
2019-02-14 01:11:17 +00:00
2021-11-19 20:21:45 +00:00
dir := t . TempDir ( )
2019-02-14 01:11:17 +00:00
2020-11-19 15:23:03 +00:00
s := NewStorage ( nil , nil , nil , dir , defaultFlushDeadline , nil )
2020-08-11 19:37:03 +00:00
defer s . Close ( )
2017-05-10 09:44:13 +00:00
2021-05-06 20:53:52 +00:00
queueConfig := config . DefaultQueueConfig
queueConfig . BatchSendDeadline = model . Duration ( 100 * time . Millisecond )
queueConfig . MaxShards = 1
2020-08-11 19:37:03 +00:00
// We need to set URL's so that metric creation doesn't panic.
2022-06-30 16:20:16 +00:00
writeConfig := baseRemoteWriteConfig ( "http://test-storage.com" )
2020-08-11 19:37:03 +00:00
writeConfig . QueueConfig = queueConfig
2021-05-06 20:53:52 +00:00
writeConfig . SendExemplars = true
2022-07-14 13:13:12 +00:00
writeConfig . SendNativeHistograms = true
2020-08-11 19:37:03 +00:00
2021-05-06 20:53:52 +00:00
conf := & config . Config {
GlobalConfig : config . DefaultGlobalConfig ,
RemoteWriteConfigs : [ ] * config . RemoteWriteConfig {
2022-06-30 16:20:16 +00:00
writeConfig ,
2021-05-06 20:53:52 +00:00
} ,
}
for _ , tc := range testcases {
t . Run ( tc . name , func ( t * testing . T ) {
var (
2023-01-13 11:09:20 +00:00
series [ ] record . RefSeries
samples [ ] record . RefSample
exemplars [ ] record . RefExemplar
histograms [ ] record . RefHistogramSample
floatHistograms [ ] record . RefFloatHistogramSample
2021-05-06 20:53:52 +00:00
)
2017-05-10 09:44:13 +00:00
2021-05-06 20:53:52 +00:00
// Generates same series in both cases.
if tc . samples {
samples , series = createTimeseries ( n , n )
}
if tc . exemplars {
exemplars , series = createExemplars ( n , n )
}
2022-07-14 13:13:12 +00:00
if tc . histograms {
2023-01-13 11:09:20 +00:00
histograms , _ , series = createHistograms ( n , n , false )
}
if tc . floatHistograms {
_ , floatHistograms , series = createHistograms ( n , n , true )
2022-07-14 13:13:12 +00:00
}
2021-05-06 20:53:52 +00:00
// Apply new config.
queueConfig . Capacity = len ( samples )
queueConfig . MaxSamplesPerSend = len ( samples ) / 2
require . NoError ( t , s . ApplyConfig ( conf ) )
hash , err := toHash ( writeConfig )
require . NoError ( t , err )
qm := s . rws . queues [ hash ]
c := NewTestWriteClient ( )
qm . SetClient ( c )
qm . StoreSeries ( series , 0 )
// Send first half of data.
c . expectSamples ( samples [ : len ( samples ) / 2 ] , series )
c . expectExemplars ( exemplars [ : len ( exemplars ) / 2 ] , series )
2022-07-14 13:13:12 +00:00
c . expectHistograms ( histograms [ : len ( histograms ) / 2 ] , series )
2023-01-13 11:09:20 +00:00
c . expectFloatHistograms ( floatHistograms [ : len ( floatHistograms ) / 2 ] , series )
2021-05-06 20:53:52 +00:00
qm . Append ( samples [ : len ( samples ) / 2 ] )
qm . AppendExemplars ( exemplars [ : len ( exemplars ) / 2 ] )
2022-07-14 13:13:12 +00:00
qm . AppendHistograms ( histograms [ : len ( histograms ) / 2 ] )
2023-01-13 11:09:20 +00:00
qm . AppendFloatHistograms ( floatHistograms [ : len ( floatHistograms ) / 2 ] )
2021-05-06 20:53:52 +00:00
c . waitForExpectedData ( t )
// Send second half of data.
c . expectSamples ( samples [ len ( samples ) / 2 : ] , series )
c . expectExemplars ( exemplars [ len ( exemplars ) / 2 : ] , series )
2022-07-14 13:13:12 +00:00
c . expectHistograms ( histograms [ len ( histograms ) / 2 : ] , series )
2023-01-13 11:09:20 +00:00
c . expectFloatHistograms ( floatHistograms [ len ( floatHistograms ) / 2 : ] , series )
2021-05-06 20:53:52 +00:00
qm . Append ( samples [ len ( samples ) / 2 : ] )
qm . AppendExemplars ( exemplars [ len ( exemplars ) / 2 : ] )
2022-07-14 13:13:12 +00:00
qm . AppendHistograms ( histograms [ len ( histograms ) / 2 : ] )
2023-01-13 11:09:20 +00:00
qm . AppendFloatHistograms ( floatHistograms [ len ( floatHistograms ) / 2 : ] )
2021-05-06 20:53:52 +00:00
c . waitForExpectedData ( t )
} )
}
2017-05-10 09:44:13 +00:00
}
2023-03-27 15:32:25 +00:00
func newTestClientAndQueueManager ( t testing . TB , flushDeadline time . Duration ) ( * TestWriteClient , * QueueManager ) {
2020-11-19 15:23:03 +00:00
c := NewTestWriteClient ( )
cfg := config . DefaultQueueConfig
mcfg := config . DefaultMetadataConfig
2023-03-27 15:32:25 +00:00
return c , newTestQueueManager ( t , cfg , mcfg , flushDeadline , c )
}
2020-11-19 15:23:03 +00:00
2023-03-27 15:32:25 +00:00
func newTestQueueManager ( t testing . TB , cfg config . QueueConfig , mcfg config . MetadataConfig , deadline time . Duration , c WriteClient ) * QueueManager {
dir := t . TempDir ( )
2020-11-19 15:23:03 +00:00
metrics := newQueueManagerMetrics ( nil , "" , "" )
2023-03-27 15:32:25 +00:00
m := NewQueueManager ( metrics , nil , nil , nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , cfg , mcfg , labels . EmptyLabels ( ) , nil , c , deadline , newPool ( ) , newHighestTimestampMetric ( ) , nil , false , false )
return m
}
func TestMetadataDelivery ( t * testing . T ) {
c , m := newTestClientAndQueueManager ( t , defaultFlushDeadline )
2020-11-19 15:23:03 +00:00
m . Start ( )
defer m . Stop ( )
2021-06-24 22:39:50 +00:00
metadata := [ ] scrape . MetricMetadata { }
numMetadata := 1532
for i := 0 ; i < numMetadata ; i ++ {
metadata = append ( metadata , scrape . MetricMetadata {
Metric : "prometheus_remote_storage_sent_metadata_bytes_total_" + strconv . Itoa ( i ) ,
2023-11-22 14:39:21 +00:00
Type : model . MetricTypeCounter ,
2020-11-19 15:23:03 +00:00
Help : "a nice help text" ,
Unit : "" ,
2021-06-24 22:39:50 +00:00
} )
}
m . AppendMetadata ( context . Background ( ) , metadata )
2020-11-19 15:23:03 +00:00
2023-12-07 11:35:01 +00:00
require . Len ( t , c . receivedMetadata , numMetadata )
2021-06-24 22:39:50 +00:00
// One more write than the rounded qoutient should be performed in order to get samples that didn't
// fit into MaxSamplesPerSend.
2023-03-27 15:32:25 +00:00
require . Equal ( t , numMetadata / config . DefaultMetadataConfig . MaxSamplesPerSend + 1 , c . writesReceived )
2021-06-24 22:39:50 +00:00
// Make sure the last samples were sent.
require . Equal ( t , c . receivedMetadata [ metadata [ len ( metadata ) - 1 ] . Metric ] [ 0 ] . MetricFamilyName , metadata [ len ( metadata ) - 1 ] . Metric )
2020-11-19 15:23:03 +00:00
}
2018-03-12 15:35:43 +00:00
func TestSampleDeliveryTimeout ( t * testing . T ) {
2018-03-12 16:48:51 +00:00
// Let's send one less sample than batch size, and wait the timeout duration
2018-09-07 21:26:04 +00:00
n := 9
2020-02-25 19:10:57 +00:00
samples , series := createTimeseries ( n , n )
2018-03-12 15:35:43 +00:00
cfg := config . DefaultQueueConfig
cfg . MaxShards = 1
2018-08-24 14:55:21 +00:00
cfg . BatchSendDeadline = model . Duration ( 100 * time . Millisecond )
2019-02-14 01:11:17 +00:00
2023-03-27 15:32:25 +00:00
c := NewTestWriteClient ( )
m := newTestQueueManager ( t , cfg , config . DefaultMetadataConfig , defaultFlushDeadline , c )
2019-03-13 10:02:36 +00:00
m . StoreSeries ( series , 0 )
2018-03-12 15:35:43 +00:00
m . Start ( )
defer m . Stop ( )
// Send the samples twice, waiting for the samples in the meantime.
2018-09-07 21:26:04 +00:00
c . expectSamples ( samples , series )
m . Append ( samples )
2021-05-06 20:53:52 +00:00
c . waitForExpectedData ( t )
2018-03-12 15:35:43 +00:00
2018-09-07 21:26:04 +00:00
c . expectSamples ( samples , series )
m . Append ( samples )
2021-05-06 20:53:52 +00:00
c . waitForExpectedData ( t )
2018-03-12 15:35:43 +00:00
}
2017-05-10 09:44:13 +00:00
func TestSampleDeliveryOrder ( t * testing . T ) {
ts := 10
2017-10-23 13:57:44 +00:00
n := config . DefaultQueueConfig . MaxSamplesPerSend * ts
2019-09-19 09:15:41 +00:00
samples := make ( [ ] record . RefSample , 0 , n )
series := make ( [ ] record . RefSeries , 0 , n )
2017-05-10 09:44:13 +00:00
for i := 0 ; i < n ; i ++ {
2018-09-07 21:26:04 +00:00
name := fmt . Sprintf ( "test_metric_%d" , i % ts )
2019-09-19 09:15:41 +00:00
samples = append ( samples , record . RefSample {
2021-11-06 10:10:04 +00:00
Ref : chunks . HeadSeriesRef ( i ) ,
2018-09-07 21:26:04 +00:00
T : int64 ( i ) ,
V : float64 ( i ) ,
} )
2019-09-19 09:15:41 +00:00
series = append ( series , record . RefSeries {
2021-11-06 10:10:04 +00:00
Ref : chunks . HeadSeriesRef ( i ) ,
2022-07-21 16:41:20 +00:00
Labels : labels . FromStrings ( "__name__" , name ) ,
2017-05-10 09:44:13 +00:00
} )
}
2023-03-27 15:32:25 +00:00
c , m := newTestClientAndQueueManager ( t , defaultFlushDeadline )
2018-09-07 21:26:04 +00:00
c . expectSamples ( samples , series )
2019-03-13 10:02:36 +00:00
m . StoreSeries ( series , 0 )
2017-05-10 09:44:13 +00:00
m . Start ( )
defer m . Stop ( )
2018-09-07 21:26:04 +00:00
// These should be received by the client.
m . Append ( samples )
2021-05-06 20:53:52 +00:00
c . waitForExpectedData ( t )
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
func TestShutdown ( t * testing . T ) {
2019-02-19 16:43:58 +00:00
deadline := 1 * time . Second
2020-06-24 13:41:52 +00:00
c := NewTestBlockedWriteClient ( )
2017-05-10 09:44:13 +00:00
2020-11-19 15:23:03 +00:00
cfg := config . DefaultQueueConfig
mcfg := config . DefaultMetadataConfig
2020-03-20 16:34:15 +00:00
2023-03-27 15:32:25 +00:00
m := newTestQueueManager ( t , cfg , mcfg , deadline , c )
2020-02-25 19:10:57 +00:00
n := 2 * config . DefaultQueueConfig . MaxSamplesPerSend
samples , series := createTimeseries ( n , n )
2019-03-13 10:02:36 +00:00
m . StoreSeries ( series , 0 )
2018-09-07 21:26:04 +00:00
m . Start ( )
// Append blocks to guarantee delivery, so we do it in the background.
go func ( ) {
m . Append ( samples )
} ( )
2019-02-19 16:43:58 +00:00
time . Sleep ( 100 * time . Millisecond )
2018-09-07 21:26:04 +00:00
// Test to ensure that Stop doesn't block.
start := time . Now ( )
m . Stop ( )
// The samples will never be delivered, so duration should
// be at least equal to deadline, otherwise the flush deadline
// was not respected.
duration := time . Since ( start )
2021-07-28 08:03:46 +00:00
if duration > deadline + ( deadline / 10 ) {
2018-09-07 21:26:04 +00:00
t . Errorf ( "Took too long to shutdown: %s > %s" , duration , deadline )
}
2021-07-28 08:03:46 +00:00
if duration < deadline {
2018-09-07 21:26:04 +00:00
t . Errorf ( "Shutdown occurred before flush deadline: %s < %s" , duration , deadline )
2017-05-10 09:44:13 +00:00
}
}
2018-09-07 21:26:04 +00:00
func TestSeriesReset ( t * testing . T ) {
2020-06-24 13:41:52 +00:00
c := NewTestBlockedWriteClient ( )
2018-09-07 21:26:04 +00:00
deadline := 5 * time . Second
numSegments := 4
numSeries := 25
2020-11-19 15:23:03 +00:00
cfg := config . DefaultQueueConfig
mcfg := config . DefaultMetadataConfig
2023-03-27 15:32:25 +00:00
m := newTestQueueManager ( t , cfg , mcfg , deadline , c )
2018-09-07 21:26:04 +00:00
for i := 0 ; i < numSegments ; i ++ {
2019-09-19 09:15:41 +00:00
series := [ ] record . RefSeries { }
2018-09-07 21:26:04 +00:00
for j := 0 ; j < numSeries ; j ++ {
2022-07-21 16:41:20 +00:00
series = append ( series , record . RefSeries { Ref : chunks . HeadSeriesRef ( ( i * 100 ) + j ) , Labels : labels . FromStrings ( "a" , "a" ) } )
2018-09-07 21:26:04 +00:00
}
m . StoreSeries ( series , i )
2018-05-29 08:51:29 +00:00
}
2023-12-07 11:35:01 +00:00
require . Len ( t , m . seriesLabels , numSegments * numSeries )
2018-09-07 21:26:04 +00:00
m . SeriesReset ( 2 )
2023-12-07 11:35:01 +00:00
require . Len ( t , m . seriesLabels , numSegments * numSeries / 2 )
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
func TestReshard ( t * testing . T ) {
size := 10 // Make bigger to find more races.
2020-02-25 19:10:57 +00:00
nSeries := 6
nSamples := config . DefaultQueueConfig . Capacity * size
samples , series := createTimeseries ( nSamples , nSeries )
2018-09-07 21:26:04 +00:00
cfg := config . DefaultQueueConfig
cfg . MaxShards = 1
2023-03-27 15:32:25 +00:00
c := NewTestWriteClient ( )
m := newTestQueueManager ( t , cfg , config . DefaultMetadataConfig , defaultFlushDeadline , c )
c . expectSamples ( samples , series )
2019-03-13 10:02:36 +00:00
m . StoreSeries ( series , 0 )
2018-09-07 21:26:04 +00:00
m . Start ( )
defer m . Stop ( )
go func ( ) {
for i := 0 ; i < len ( samples ) ; i += config . DefaultQueueConfig . Capacity {
sent := m . Append ( samples [ i : i + config . DefaultQueueConfig . Capacity ] )
2020-10-29 09:43:23 +00:00
require . True ( t , sent , "samples not sent" )
2018-09-07 21:26:04 +00:00
time . Sleep ( 100 * time . Millisecond )
}
} ( )
for i := 1 ; i < len ( samples ) / config . DefaultQueueConfig . Capacity ; i ++ {
m . shards . stop ( )
m . shards . start ( i )
time . Sleep ( 100 * time . Millisecond )
}
2021-05-06 20:53:52 +00:00
c . waitForExpectedData ( t )
2017-05-10 09:44:13 +00:00
}
2023-03-27 15:32:25 +00:00
func TestReshardRaceWithStop ( t * testing . T ) {
2020-06-24 13:41:52 +00:00
c := NewTestWriteClient ( )
2019-04-16 10:25:19 +00:00
var m * QueueManager
h := sync . Mutex { }
h . Lock ( )
2020-11-19 15:23:03 +00:00
cfg := config . DefaultQueueConfig
mcfg := config . DefaultMetadataConfig
2021-12-30 04:48:11 +00:00
exitCh := make ( chan struct { } )
2019-04-16 10:25:19 +00:00
go func ( ) {
for {
2023-03-27 15:32:25 +00:00
m = newTestQueueManager ( t , cfg , mcfg , defaultFlushDeadline , c )
2019-04-16 10:25:19 +00:00
m . Start ( )
h . Unlock ( )
h . Lock ( )
m . Stop ( )
2021-12-30 04:48:11 +00:00
select {
case exitCh <- struct { } { } :
return
default :
}
2019-04-16 10:25:19 +00:00
}
} ( )
for i := 1 ; i < 100 ; i ++ {
h . Lock ( )
m . reshardChan <- i
h . Unlock ( )
}
2021-12-30 04:48:11 +00:00
<- exitCh
2019-04-16 10:25:19 +00:00
}
2022-02-11 14:07:41 +00:00
func TestReshardPartialBatch ( t * testing . T ) {
samples , series := createTimeseries ( 1 , 10 )
c := NewTestBlockedWriteClient ( )
cfg := config . DefaultQueueConfig
mcfg := config . DefaultMetadataConfig
cfg . MaxShards = 1
batchSendDeadline := time . Millisecond
flushDeadline := 10 * time . Millisecond
cfg . BatchSendDeadline = model . Duration ( batchSendDeadline )
2023-03-27 15:32:25 +00:00
m := newTestQueueManager ( t , cfg , mcfg , flushDeadline , c )
2022-02-11 14:07:41 +00:00
m . StoreSeries ( series , 0 )
m . Start ( )
for i := 0 ; i < 100 ; i ++ {
done := make ( chan struct { } )
go func ( ) {
m . Append ( samples )
time . Sleep ( batchSendDeadline )
m . shards . stop ( )
m . shards . start ( 1 )
done <- struct { } { }
} ( )
select {
case <- done :
case <- time . After ( 2 * time . Second ) :
t . Error ( "Deadlock between sending and stopping detected" )
2022-03-03 22:53:36 +00:00
pprof . Lookup ( "goroutine" ) . WriteTo ( os . Stdout , 1 )
2022-02-11 14:07:41 +00:00
t . FailNow ( )
}
}
// We can only call stop if there was not a deadlock.
m . Stop ( )
}
2022-03-03 22:53:36 +00:00
// TestQueueFilledDeadlock makes sure the code does not deadlock in the case
// where a large scrape (> capacity + max samples per send) is appended at the
// same time as a batch times out according to the batch send deadline.
func TestQueueFilledDeadlock ( t * testing . T ) {
samples , series := createTimeseries ( 50 , 1 )
c := NewNopWriteClient ( )
cfg := config . DefaultQueueConfig
mcfg := config . DefaultMetadataConfig
cfg . MaxShards = 1
cfg . MaxSamplesPerSend = 10
cfg . Capacity = 20
flushDeadline := time . Second
batchSendDeadline := time . Millisecond
cfg . BatchSendDeadline = model . Duration ( batchSendDeadline )
2023-03-27 15:32:25 +00:00
m := newTestQueueManager ( t , cfg , mcfg , flushDeadline , c )
2022-03-03 22:53:36 +00:00
m . StoreSeries ( series , 0 )
m . Start ( )
defer m . Stop ( )
for i := 0 ; i < 100 ; i ++ {
done := make ( chan struct { } )
go func ( ) {
time . Sleep ( batchSendDeadline )
m . Append ( samples )
done <- struct { } { }
} ( )
select {
case <- done :
case <- time . After ( 2 * time . Second ) :
t . Error ( "Deadlock between sending and appending detected" )
pprof . Lookup ( "goroutine" ) . WriteTo ( os . Stdout , 1 )
t . FailNow ( )
}
}
}
2019-04-24 09:46:31 +00:00
func TestReleaseNoninternedString ( t * testing . T ) {
2023-03-27 15:32:25 +00:00
_ , m := newTestClientAndQueueManager ( t , defaultFlushDeadline )
2019-04-24 09:46:31 +00:00
m . Start ( )
2021-10-29 22:44:32 +00:00
defer m . Stop ( )
2019-04-24 09:46:31 +00:00
for i := 1 ; i < 1000 ; i ++ {
2019-09-19 09:15:41 +00:00
m . StoreSeries ( [ ] record . RefSeries {
2019-08-13 08:34:14 +00:00
{
2022-07-21 16:41:20 +00:00
Ref : chunks . HeadSeriesRef ( i ) ,
2024-05-13 15:36:19 +00:00
Labels : labels . FromStrings ( "asdf" , strconv . Itoa ( i ) ) ,
2019-04-24 09:46:31 +00:00
} ,
} , 0 )
2019-06-27 18:48:21 +00:00
m . SeriesReset ( 1 )
2019-04-24 09:46:31 +00:00
}
metric := client_testutil . ToFloat64 ( noReferenceReleases )
2020-10-29 09:43:23 +00:00
require . Equal ( t , 0.0 , metric , "expected there to be no calls to release for strings that were not already interned: %d" , int ( metric ) )
2019-04-24 09:46:31 +00:00
}
2020-04-20 22:20:39 +00:00
func TestShouldReshard ( t * testing . T ) {
2019-10-21 21:54:25 +00:00
type testcase struct {
2020-04-20 22:20:39 +00:00
startingShards int
samplesIn , samplesOut , lastSendTimestamp int64
expectedToReshard bool
2019-10-21 21:54:25 +00:00
}
cases := [ ] testcase {
{
2020-04-20 22:20:39 +00:00
// Resharding shouldn't take place if the last successful send was > batch send deadline*2 seconds ago.
startingShards : 10 ,
samplesIn : 1000 ,
samplesOut : 10 ,
lastSendTimestamp : time . Now ( ) . Unix ( ) - int64 ( 3 * time . Duration ( config . DefaultQueueConfig . BatchSendDeadline ) / time . Second ) ,
expectedToReshard : false ,
2019-10-21 21:54:25 +00:00
} ,
{
2020-04-20 22:20:39 +00:00
startingShards : 5 ,
samplesIn : 1000 ,
samplesOut : 10 ,
lastSendTimestamp : time . Now ( ) . Unix ( ) ,
expectedToReshard : true ,
2019-10-21 21:54:25 +00:00
} ,
}
2020-11-19 15:23:03 +00:00
2019-10-21 21:54:25 +00:00
for _ , c := range cases {
2023-03-27 15:32:25 +00:00
_ , m := newTestClientAndQueueManager ( t , defaultFlushDeadline )
2019-10-21 21:54:25 +00:00
m . numShards = c . startingShards
2021-05-06 20:53:52 +00:00
m . dataIn . incr ( c . samplesIn )
m . dataOut . incr ( c . samplesOut )
2020-07-30 07:45:42 +00:00
m . lastSendTimestamp . Store ( c . lastSendTimestamp )
2019-10-21 21:54:25 +00:00
m . Start ( )
2020-04-20 22:20:39 +00:00
2019-10-21 21:54:25 +00:00
desiredShards := m . calculateDesiredShards ( )
2020-04-20 22:20:39 +00:00
shouldReshard := m . shouldReshard ( desiredShards )
2019-10-21 21:54:25 +00:00
m . Stop ( )
2020-04-20 22:20:39 +00:00
2020-10-29 09:43:23 +00:00
require . Equal ( t , c . expectedToReshard , shouldReshard )
2019-10-21 21:54:25 +00:00
}
}
2024-02-28 22:28:39 +00:00
// TestDisableReshardOnRetry asserts that resharding should be disabled when a
// recoverable error is returned from remote_write.
func TestDisableReshardOnRetry ( t * testing . T ) {
onStoredContext , onStoreCalled := context . WithCancel ( context . Background ( ) )
defer onStoreCalled ( )
var (
fakeSamples , fakeSeries = createTimeseries ( 100 , 100 )
cfg = config . DefaultQueueConfig
mcfg = config . DefaultMetadataConfig
retryAfter = time . Second
metrics = newQueueManagerMetrics ( nil , "" , "" )
client = & MockWriteClient {
StoreFunc : func ( ctx context . Context , b [ ] byte , i int ) error {
onStoreCalled ( )
return RecoverableError {
error : fmt . Errorf ( "fake error" ) ,
retryAfter : model . Duration ( retryAfter ) ,
}
} ,
NameFunc : func ( ) string { return "mock" } ,
EndpointFunc : func ( ) string { return "http://fake:9090/api/v1/write" } ,
}
)
m := NewQueueManager ( metrics , nil , nil , nil , "" , newEWMARate ( ewmaWeight , shardUpdateDuration ) , cfg , mcfg , labels . EmptyLabels ( ) , nil , client , 0 , newPool ( ) , newHighestTimestampMetric ( ) , nil , false , false )
m . StoreSeries ( fakeSeries , 0 )
// Attempt to samples while the manager is running. We immediately stop the
// manager after the recoverable error is generated to prevent the manager
// from resharding itself.
m . Start ( )
{
m . Append ( fakeSamples )
select {
case <- onStoredContext . Done ( ) :
case <- time . After ( time . Minute ) :
require . FailNow ( t , "timed out waiting for client to be sent metrics" )
}
}
m . Stop ( )
require . Eventually ( t , func ( ) bool {
// Force m.lastSendTimestamp to be current so the last send timestamp isn't
// the reason resharding is disabled.
m . lastSendTimestamp . Store ( time . Now ( ) . Unix ( ) )
return m . shouldReshard ( m . numShards + 1 ) == false
} , time . Minute , 10 * time . Millisecond , "shouldReshard was never disabled" )
// After 2x retryAfter, resharding should be enabled again.
require . Eventually ( t , func ( ) bool {
// Force m.lastSendTimestamp to be current so the last send timestamp isn't
// the reason resharding is disabled.
m . lastSendTimestamp . Store ( time . Now ( ) . Unix ( ) )
return m . shouldReshard ( m . numShards + 1 ) == true
} , time . Minute , retryAfter , "shouldReshard should have been re-enabled" )
}
2021-07-27 20:21:48 +00:00
func createTimeseries ( numSamples , numSeries int , extraLabels ... labels . Label ) ( [ ] record . RefSample , [ ] record . RefSeries ) {
2020-02-25 19:10:57 +00:00
samples := make ( [ ] record . RefSample , 0 , numSamples )
series := make ( [ ] record . RefSeries , 0 , numSeries )
2023-03-27 15:27:35 +00:00
lb := labels . NewScratchBuilder ( 1 + len ( extraLabels ) )
2020-02-25 19:10:57 +00:00
for i := 0 ; i < numSeries ; i ++ {
2018-09-07 21:26:04 +00:00
name := fmt . Sprintf ( "test_metric_%d" , i )
2020-02-25 19:10:57 +00:00
for j := 0 ; j < numSamples ; j ++ {
samples = append ( samples , record . RefSample {
2021-11-06 10:10:04 +00:00
Ref : chunks . HeadSeriesRef ( i ) ,
2020-02-25 19:10:57 +00:00
T : int64 ( j ) ,
V : float64 ( i ) ,
} )
}
2022-03-09 22:35:14 +00:00
// Create Labels that is name of series plus any extra labels supplied.
2024-01-05 18:40:30 +00:00
lb . Reset ( )
lb . Add ( labels . MetricName , name )
2022-03-09 22:35:14 +00:00
for _ , l := range extraLabels {
2024-01-05 18:40:30 +00:00
lb . Add ( l . Name , l . Value )
2022-03-09 22:35:14 +00:00
}
2024-01-05 18:40:30 +00:00
lb . Sort ( )
2019-09-19 09:15:41 +00:00
series = append ( series , record . RefSeries {
2021-11-06 10:10:04 +00:00
Ref : chunks . HeadSeriesRef ( i ) ,
2024-01-05 18:40:30 +00:00
Labels : lb . Labels ( ) ,
2018-09-07 21:26:04 +00:00
} )
}
return samples , series
2017-05-10 09:44:13 +00:00
}
2024-06-21 21:19:58 +00:00
func createProtoTimeseriesWithOld ( numSamples , baseTs int64 , extraLabels ... labels . Label ) [ ] prompb . TimeSeries {
samples := make ( [ ] prompb . TimeSeries , numSamples )
// use a fixed rand source so tests are consistent
r := rand . New ( rand . NewSource ( 99 ) )
for j := int64 ( 0 ) ; j < numSamples ; j ++ {
name := fmt . Sprintf ( "test_metric_%d" , j )
samples [ j ] = prompb . TimeSeries {
Labels : [ ] prompb . Label { { Name : "__name__" , Value : name } } ,
Samples : [ ] prompb . Sample {
{
Timestamp : baseTs + j ,
Value : float64 ( j ) ,
} ,
} ,
}
// 10% of the time use a ts that is too old
if r . Intn ( 10 ) == 0 {
samples [ j ] . Samples [ 0 ] . Timestamp = baseTs - 5
}
}
return samples
}
2021-05-06 20:53:52 +00:00
func createExemplars ( numExemplars , numSeries int ) ( [ ] record . RefExemplar , [ ] record . RefSeries ) {
exemplars := make ( [ ] record . RefExemplar , 0 , numExemplars )
series := make ( [ ] record . RefSeries , 0 , numSeries )
for i := 0 ; i < numSeries ; i ++ {
name := fmt . Sprintf ( "test_metric_%d" , i )
for j := 0 ; j < numExemplars ; j ++ {
e := record . RefExemplar {
2021-11-06 10:10:04 +00:00
Ref : chunks . HeadSeriesRef ( i ) ,
2021-05-06 20:53:52 +00:00
T : int64 ( j ) ,
V : float64 ( i ) ,
2024-02-15 14:19:54 +00:00
Labels : labels . FromStrings ( "trace_id" , fmt . Sprintf ( "trace-%d" , i ) ) ,
2021-05-06 20:53:52 +00:00
}
exemplars = append ( exemplars , e )
}
series = append ( series , record . RefSeries {
2021-11-06 10:10:04 +00:00
Ref : chunks . HeadSeriesRef ( i ) ,
2022-07-21 16:41:20 +00:00
Labels : labels . FromStrings ( "__name__" , name ) ,
2021-05-06 20:53:52 +00:00
} )
}
return exemplars , series
}
2023-01-13 11:09:20 +00:00
func createHistograms ( numSamples , numSeries int , floatHistogram bool ) ( [ ] record . RefHistogramSample , [ ] record . RefFloatHistogramSample , [ ] record . RefSeries ) {
2022-08-29 12:08:36 +00:00
histograms := make ( [ ] record . RefHistogramSample , 0 , numSamples )
2023-01-13 11:09:20 +00:00
floatHistograms := make ( [ ] record . RefFloatHistogramSample , 0 , numSamples )
2022-07-14 13:13:12 +00:00
series := make ( [ ] record . RefSeries , 0 , numSeries )
for i := 0 ; i < numSeries ; i ++ {
name := fmt . Sprintf ( "test_metric_%d" , i )
for j := 0 ; j < numSamples ; j ++ {
2023-01-13 11:09:20 +00:00
hist := & histogram . Histogram {
Schema : 2 ,
ZeroThreshold : 1e-128 ,
ZeroCount : 0 ,
Count : 2 ,
Sum : 0 ,
PositiveSpans : [ ] histogram . Span { { Offset : 0 , Length : 1 } } ,
PositiveBuckets : [ ] int64 { int64 ( i ) + 1 } ,
NegativeSpans : [ ] histogram . Span { { Offset : 0 , Length : 1 } } ,
NegativeBuckets : [ ] int64 { int64 ( - i ) - 1 } ,
}
if floatHistogram {
fh := record . RefFloatHistogramSample {
Ref : chunks . HeadSeriesRef ( i ) ,
T : int64 ( j ) ,
2023-11-29 14:15:57 +00:00
FH : hist . ToFloat ( nil ) ,
2023-01-13 11:09:20 +00:00
}
floatHistograms = append ( floatHistograms , fh )
} else {
h := record . RefHistogramSample {
Ref : chunks . HeadSeriesRef ( i ) ,
T : int64 ( j ) ,
H : hist ,
}
histograms = append ( histograms , h )
2022-07-14 13:13:12 +00:00
}
}
series = append ( series , record . RefSeries {
Ref : chunks . HeadSeriesRef ( i ) ,
2022-03-09 22:35:14 +00:00
Labels : labels . FromStrings ( "__name__" , name ) ,
2022-07-14 13:13:12 +00:00
} )
}
2023-01-13 11:09:20 +00:00
if floatHistogram {
return nil , floatHistograms , series
}
return histograms , nil , series
2022-07-14 13:13:12 +00:00
}
2024-06-21 21:19:58 +00:00
func getSeriesIDFromRef ( r record . RefSeries ) string {
return r . Labels . String ( )
2017-05-10 09:44:13 +00:00
}
2020-06-24 13:41:52 +00:00
type TestWriteClient struct {
2023-01-13 11:09:20 +00:00
receivedSamples map [ string ] [ ] prompb . Sample
expectedSamples map [ string ] [ ] prompb . Sample
receivedExemplars map [ string ] [ ] prompb . Exemplar
expectedExemplars map [ string ] [ ] prompb . Exemplar
receivedHistograms map [ string ] [ ] prompb . Histogram
receivedFloatHistograms map [ string ] [ ] prompb . Histogram
expectedHistograms map [ string ] [ ] prompb . Histogram
expectedFloatHistograms map [ string ] [ ] prompb . Histogram
receivedMetadata map [ string ] [ ] prompb . MetricMetadata
writesReceived int
withWaitGroup bool
wg sync . WaitGroup
mtx sync . Mutex
buf [ ] byte
2024-06-21 21:19:58 +00:00
storeWait time . Duration
returnError error
2018-09-07 21:26:04 +00:00
}
2017-05-10 09:44:13 +00:00
2020-06-24 13:41:52 +00:00
func NewTestWriteClient ( ) * TestWriteClient {
return & TestWriteClient {
2020-11-19 15:23:03 +00:00
withWaitGroup : true ,
receivedSamples : map [ string ] [ ] prompb . Sample { } ,
expectedSamples : map [ string ] [ ] prompb . Sample { } ,
receivedMetadata : map [ string ] [ ] prompb . MetricMetadata { } ,
2024-06-21 21:19:58 +00:00
storeWait : 0 ,
returnError : nil ,
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
}
2017-05-10 09:44:13 +00:00
2020-06-24 13:41:52 +00:00
func ( c * TestWriteClient ) expectSamples ( ss [ ] record . RefSample , series [ ] record . RefSeries ) {
2020-02-25 19:10:57 +00:00
if ! c . withWaitGroup {
return
}
2018-09-07 21:26:04 +00:00
c . mtx . Lock ( )
defer c . mtx . Unlock ( )
2017-05-10 09:44:13 +00:00
2018-09-07 21:26:04 +00:00
c . expectedSamples = map [ string ] [ ] prompb . Sample { }
c . receivedSamples = map [ string ] [ ] prompb . Sample { }
2017-05-10 09:44:13 +00:00
2018-09-07 21:26:04 +00:00
for _ , s := range ss {
2024-06-21 21:19:58 +00:00
tsID := getSeriesIDFromRef ( series [ s . Ref ] )
c . expectedSamples [ tsID ] = append ( c . expectedSamples [ tsID ] , prompb . Sample {
2018-09-07 21:26:04 +00:00
Timestamp : s . T ,
Value : s . V ,
} )
}
2024-06-21 21:19:58 +00:00
if ! c . withWaitGroup {
return
}
2018-09-07 21:26:04 +00:00
c . wg . Add ( len ( ss ) )
}
2017-05-10 09:44:13 +00:00
2021-05-06 20:53:52 +00:00
func ( c * TestWriteClient ) expectExemplars ( ss [ ] record . RefExemplar , series [ ] record . RefSeries ) {
if ! c . withWaitGroup {
return
}
c . mtx . Lock ( )
defer c . mtx . Unlock ( )
c . expectedExemplars = map [ string ] [ ] prompb . Exemplar { }
c . receivedExemplars = map [ string ] [ ] prompb . Exemplar { }
for _ , s := range ss {
2024-06-21 21:19:58 +00:00
tsID := getSeriesIDFromRef ( series [ s . Ref ] )
2021-05-06 20:53:52 +00:00
e := prompb . Exemplar {
2024-06-19 15:30:49 +00:00
Labels : LabelsToLabelsProto ( s . Labels , nil ) ,
2021-05-06 20:53:52 +00:00
Timestamp : s . T ,
Value : s . V ,
}
2024-06-21 21:19:58 +00:00
c . expectedExemplars [ tsID ] = append ( c . expectedExemplars [ tsID ] , e )
2021-05-06 20:53:52 +00:00
}
c . wg . Add ( len ( ss ) )
}
2022-08-29 12:08:36 +00:00
func ( c * TestWriteClient ) expectHistograms ( hh [ ] record . RefHistogramSample , series [ ] record . RefSeries ) {
2022-07-14 13:13:12 +00:00
if ! c . withWaitGroup {
return
}
c . mtx . Lock ( )
defer c . mtx . Unlock ( )
c . expectedHistograms = map [ string ] [ ] prompb . Histogram { }
c . receivedHistograms = map [ string ] [ ] prompb . Histogram { }
for _ , h := range hh {
2024-06-21 21:19:58 +00:00
tsID := getSeriesIDFromRef ( series [ h . Ref ] )
c . expectedHistograms [ tsID ] = append ( c . expectedHistograms [ tsID ] , HistogramToHistogramProto ( h . T , h . H ) )
2022-07-14 13:13:12 +00:00
}
c . wg . Add ( len ( hh ) )
}
2023-01-13 11:09:20 +00:00
func ( c * TestWriteClient ) expectFloatHistograms ( fhs [ ] record . RefFloatHistogramSample , series [ ] record . RefSeries ) {
if ! c . withWaitGroup {
return
}
c . mtx . Lock ( )
defer c . mtx . Unlock ( )
c . expectedFloatHistograms = map [ string ] [ ] prompb . Histogram { }
c . receivedFloatHistograms = map [ string ] [ ] prompb . Histogram { }
for _ , fh := range fhs {
2024-06-21 21:19:58 +00:00
tsID := getSeriesIDFromRef ( series [ fh . Ref ] )
c . expectedFloatHistograms [ tsID ] = append ( c . expectedFloatHistograms [ tsID ] , FloatHistogramToHistogramProto ( fh . T , fh . FH ) )
2023-01-13 11:09:20 +00:00
}
c . wg . Add ( len ( fhs ) )
}
2021-05-06 20:53:52 +00:00
func ( c * TestWriteClient ) waitForExpectedData ( tb testing . TB ) {
2020-02-25 19:10:57 +00:00
if ! c . withWaitGroup {
return
}
2018-09-07 21:26:04 +00:00
c . wg . Wait ( )
c . mtx . Lock ( )
defer c . mtx . Unlock ( )
for ts , expectedSamples := range c . expectedSamples {
2020-10-29 09:43:23 +00:00
require . Equal ( tb , expectedSamples , c . receivedSamples [ ts ] , ts )
2017-05-10 09:44:13 +00:00
}
2021-05-06 20:53:52 +00:00
for ts , expectedExemplar := range c . expectedExemplars {
require . Equal ( tb , expectedExemplar , c . receivedExemplars [ ts ] , ts )
}
2022-07-14 13:13:12 +00:00
for ts , expectedHistogram := range c . expectedHistograms {
require . Equal ( tb , expectedHistogram , c . receivedHistograms [ ts ] , ts )
}
2023-01-13 11:09:20 +00:00
for ts , expectedFloatHistogram := range c . expectedFloatHistograms {
require . Equal ( tb , expectedFloatHistogram , c . receivedFloatHistograms [ ts ] , ts )
}
2018-09-07 21:26:04 +00:00
}
2017-05-10 09:44:13 +00:00
2024-06-21 21:19:58 +00:00
func ( c * TestWriteClient ) SetStoreWait ( w time . Duration ) {
c . mtx . Lock ( )
defer c . mtx . Unlock ( )
c . storeWait = w
}
func ( c * TestWriteClient ) SetReturnError ( err error ) {
c . mtx . Lock ( )
defer c . mtx . Unlock ( )
c . returnError = err
}
2023-09-20 10:11:03 +00:00
func ( c * TestWriteClient ) Store ( _ context . Context , req [ ] byte , _ int ) error {
2018-09-07 21:26:04 +00:00
c . mtx . Lock ( )
defer c . mtx . Unlock ( )
2024-06-21 21:19:58 +00:00
if c . storeWait > 0 {
time . Sleep ( c . storeWait )
}
if c . returnError != nil {
return c . returnError
}
2019-06-27 18:48:21 +00:00
// nil buffers are ok for snappy, ignore cast error.
if c . buf != nil {
c . buf = c . buf [ : cap ( c . buf ) ]
}
reqBuf , err := snappy . Decode ( c . buf , req )
c . buf = reqBuf
2018-09-07 21:26:04 +00:00
if err != nil {
return err
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
var reqProto prompb . WriteRequest
if err := proto . Unmarshal ( reqBuf , & reqProto ) ; err != nil {
return err
2017-05-10 09:44:13 +00:00
}
2023-03-27 15:27:35 +00:00
builder := labels . NewScratchBuilder ( 0 )
2018-09-07 21:26:04 +00:00
count := 0
for _ , ts := range reqProto . Timeseries {
2024-06-19 15:30:49 +00:00
labels := LabelProtosToLabels ( & builder , ts . Labels )
2024-06-21 21:19:58 +00:00
tsID := labels . String ( )
2018-09-07 21:26:04 +00:00
for _ , sample := range ts . Samples {
count ++
2024-06-21 21:19:58 +00:00
c . receivedSamples [ tsID ] = append ( c . receivedSamples [ tsID ] , sample )
2018-09-07 21:26:04 +00:00
}
2021-05-06 20:53:52 +00:00
for _ , ex := range ts . Exemplars {
count ++
2024-06-21 21:19:58 +00:00
c . receivedExemplars [ tsID ] = append ( c . receivedExemplars [ tsID ] , ex )
2021-05-06 20:53:52 +00:00
}
2022-07-14 13:13:12 +00:00
for _ , histogram := range ts . Histograms {
count ++
2023-04-21 18:27:15 +00:00
if histogram . IsFloatHistogram ( ) {
2024-06-21 21:19:58 +00:00
c . receivedFloatHistograms [ tsID ] = append ( c . receivedFloatHistograms [ tsID ] , histogram )
2023-01-13 11:09:20 +00:00
} else {
2024-06-21 21:19:58 +00:00
c . receivedHistograms [ tsID ] = append ( c . receivedHistograms [ tsID ] , histogram )
2023-01-13 11:09:20 +00:00
}
2022-07-14 13:13:12 +00:00
}
2017-05-10 09:44:13 +00:00
}
2020-02-25 19:10:57 +00:00
if c . withWaitGroup {
c . wg . Add ( - count )
}
2020-11-19 15:23:03 +00:00
for _ , m := range reqProto . Metadata {
c . receivedMetadata [ m . MetricFamilyName ] = append ( c . receivedMetadata [ m . MetricFamilyName ] , m )
}
2021-06-24 22:39:50 +00:00
c . writesReceived ++
2018-09-07 21:26:04 +00:00
return nil
2017-05-10 09:44:13 +00:00
}
2018-05-29 08:51:29 +00:00
2020-06-24 13:41:52 +00:00
func ( c * TestWriteClient ) Name ( ) string {
return "testwriteclient"
2018-09-07 21:26:04 +00:00
}
2018-05-29 08:51:29 +00:00
2020-06-24 13:41:52 +00:00
func ( c * TestWriteClient ) Endpoint ( ) string {
2019-12-12 20:47:23 +00:00
return "http://test-remote.com/1234"
}
2020-06-24 13:41:52 +00:00
// TestBlockingWriteClient is a queue_manager WriteClient which will block
2018-09-07 21:26:04 +00:00
// on any calls to Store(), until the request's Context is cancelled, at which
// point the `numCalls` property will contain a count of how many times Store()
// was called.
2020-06-24 13:41:52 +00:00
type TestBlockingWriteClient struct {
2020-07-30 07:45:42 +00:00
numCalls atomic . Uint64
2018-09-07 21:26:04 +00:00
}
2020-06-24 13:41:52 +00:00
func NewTestBlockedWriteClient ( ) * TestBlockingWriteClient {
return & TestBlockingWriteClient { }
2018-09-07 21:26:04 +00:00
}
2023-09-20 10:11:03 +00:00
func ( c * TestBlockingWriteClient ) Store ( ctx context . Context , _ [ ] byte , _ int ) error {
2020-07-30 07:45:42 +00:00
c . numCalls . Inc ( )
2018-09-07 21:26:04 +00:00
<- ctx . Done ( )
return nil
}
2020-06-24 13:41:52 +00:00
func ( c * TestBlockingWriteClient ) NumCalls ( ) uint64 {
2020-07-30 07:45:42 +00:00
return c . numCalls . Load ( )
2018-09-07 21:26:04 +00:00
}
2020-06-24 13:41:52 +00:00
func ( c * TestBlockingWriteClient ) Name ( ) string {
return "testblockingwriteclient"
2018-05-29 08:51:29 +00:00
}
2019-02-20 04:03:41 +00:00
2020-06-24 13:41:52 +00:00
func ( c * TestBlockingWriteClient ) Endpoint ( ) string {
2019-12-12 20:47:23 +00:00
return "http://test-remote-blocking.com/1234"
}
2021-11-30 12:21:53 +00:00
// For benchmarking the send and not the receive side.
type NopWriteClient struct { }
2023-09-20 10:11:03 +00:00
func NewNopWriteClient ( ) * NopWriteClient { return & NopWriteClient { } }
func ( c * NopWriteClient ) Store ( context . Context , [ ] byte , int ) error { return nil }
func ( c * NopWriteClient ) Name ( ) string { return "nopwriteclient" }
func ( c * NopWriteClient ) Endpoint ( ) string { return "http://test-remote.com/1234" }
2021-11-30 12:21:53 +00:00
2024-02-28 22:28:39 +00:00
type MockWriteClient struct {
StoreFunc func ( context . Context , [ ] byte , int ) error
NameFunc func ( ) string
EndpointFunc func ( ) string
}
func ( c * MockWriteClient ) Store ( ctx context . Context , bb [ ] byte , n int ) error {
return c . StoreFunc ( ctx , bb , n )
}
func ( c * MockWriteClient ) Name ( ) string { return c . NameFunc ( ) }
func ( c * MockWriteClient ) Endpoint ( ) string { return c . EndpointFunc ( ) }
2024-01-29 12:52:03 +00:00
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
var extraLabels [ ] labels . Label = [ ] labels . Label {
{ Name : "kubernetes_io_arch" , Value : "amd64" } ,
{ Name : "kubernetes_io_instance_type" , Value : "c3.somesize" } ,
{ Name : "kubernetes_io_os" , Value : "linux" } ,
{ Name : "container_name" , Value : "some-name" } ,
{ Name : "failure_domain_kubernetes_io_region" , Value : "somewhere-1" } ,
{ Name : "failure_domain_kubernetes_io_zone" , Value : "somewhere-1b" } ,
{ Name : "id" , Value : "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28" } ,
{ Name : "image" , Value : "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506" } ,
{ Name : "instance" , Value : "ip-111-11-1-11.ec2.internal" } ,
{ Name : "job" , Value : "kubernetes-cadvisor" } ,
{ Name : "kubernetes_io_hostname" , Value : "ip-111-11-1-11" } ,
{ Name : "monitor" , Value : "prod" } ,
{ Name : "name" , Value : "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0" } ,
{ Name : "namespace" , Value : "kube-system" } ,
{ Name : "pod_name" , Value : "some-other-name-5j8s8" } ,
}
2021-11-30 12:21:53 +00:00
func BenchmarkSampleSend ( b * testing . B ) {
2021-07-27 20:21:48 +00:00
// Send one sample per series, which is the typical remote_write case
const numSamples = 1
const numSeries = 10000
samples , series := createTimeseries ( numSamples , numSeries , extraLabels ... )
2019-06-27 18:48:21 +00:00
2021-11-30 12:21:53 +00:00
c := NewNopWriteClient ( )
2019-06-27 18:48:21 +00:00
cfg := config . DefaultQueueConfig
2020-11-19 15:23:03 +00:00
mcfg := config . DefaultMetadataConfig
2019-06-27 18:48:21 +00:00
cfg . BatchSendDeadline = model . Duration ( 100 * time . Millisecond )
2021-11-30 12:21:53 +00:00
cfg . MinShards = 20
cfg . MaxShards = 20
2019-06-27 18:48:21 +00:00
2023-03-27 15:32:25 +00:00
m := newTestQueueManager ( b , cfg , mcfg , defaultFlushDeadline , c )
2019-06-27 18:48:21 +00:00
m . StoreSeries ( series , 0 )
// These should be received by the client.
m . Start ( )
defer m . Stop ( )
b . ResetTimer ( )
for i := 0 ; i < b . N ; i ++ {
2021-11-30 12:21:53 +00:00
m . Append ( samples )
2022-10-10 15:08:46 +00:00
m . UpdateSeriesSegment ( series , i + 1 ) // simulate what wlog.Watcher.garbageCollectSeries does
2021-07-27 20:21:48 +00:00
m . SeriesReset ( i + 1 )
2019-06-27 18:48:21 +00:00
}
// Do not include shutdown
b . StopTimer ( )
}
2024-01-29 12:52:03 +00:00
// Check how long it takes to add N series, including external labels processing.
func BenchmarkStoreSeries ( b * testing . B ) {
externalLabels := [ ] labels . Label {
{ Name : "cluster" , Value : "mycluster" } ,
{ Name : "replica" , Value : "1" } ,
}
relabelConfigs := [ ] * relabel . Config { {
SourceLabels : model . LabelNames { "namespace" } ,
Separator : ";" ,
Regex : relabel . MustNewRegexp ( "kube.*" ) ,
TargetLabel : "job" ,
Replacement : "$1" ,
Action : relabel . Replace ,
} }
testCases := [ ] struct {
name string
externalLabels [ ] labels . Label
ts [ ] prompb . TimeSeries
relabelConfigs [ ] * relabel . Config
} {
{ name : "plain" } ,
{ name : "externalLabels" , externalLabels : externalLabels } ,
{ name : "relabel" , relabelConfigs : relabelConfigs } ,
{
name : "externalLabels+relabel" ,
externalLabels : externalLabels ,
relabelConfigs : relabelConfigs ,
} ,
}
// numSeries chosen to be big enough that StoreSeries dominates creating a new queue manager.
const numSeries = 1000
_ , series := createTimeseries ( 0 , numSeries , extraLabels ... )
for _ , tc := range testCases {
b . Run ( tc . name , func ( b * testing . B ) {
for i := 0 ; i < b . N ; i ++ {
c := NewTestWriteClient ( )
dir := b . TempDir ( )
cfg := config . DefaultQueueConfig
mcfg := config . DefaultMetadataConfig
metrics := newQueueManagerMetrics ( nil , "" , "" )
m := NewQueueManager ( metrics , nil , nil , nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , cfg , mcfg , labels . EmptyLabels ( ) , nil , c , defaultFlushDeadline , newPool ( ) , newHighestTimestampMetric ( ) , nil , false , false )
m . externalLabels = tc . externalLabels
m . relabelConfigs = tc . relabelConfigs
m . StoreSeries ( series , 0 )
}
} )
}
}
2019-02-20 04:03:41 +00:00
func BenchmarkStartup ( b * testing . B ) {
dir := os . Getenv ( "WALDIR" )
if dir == "" {
2024-01-30 16:48:01 +00:00
b . Skip ( "WALDIR env var not set" )
2019-02-20 04:03:41 +00:00
}
2019-02-20 07:51:08 +00:00
// Find the second largest segment; we will replay up to this.
// (Second largest as WALWatcher will start tailing the largest).
2022-04-27 09:24:36 +00:00
dirents , err := os . ReadDir ( dir )
2020-10-29 09:43:23 +00:00
require . NoError ( b , err )
2019-02-20 07:51:08 +00:00
var segments [ ] int
for _ , dirent := range dirents {
if i , err := strconv . Atoi ( dirent . Name ( ) ) ; err != nil {
segments = append ( segments , i )
}
}
sort . Ints ( segments )
2019-02-20 04:03:41 +00:00
logger := log . NewLogfmtLogger ( log . NewSyncWriter ( os . Stdout ) )
logger = log . With ( logger , "caller" , log . DefaultCaller )
2020-11-19 15:23:03 +00:00
cfg := config . DefaultQueueConfig
mcfg := config . DefaultMetadataConfig
2019-02-20 04:03:41 +00:00
for n := 0 ; n < b . N ; n ++ {
2020-04-25 03:39:46 +00:00
metrics := newQueueManagerMetrics ( nil , "" , "" )
2020-06-24 13:41:52 +00:00
c := NewTestBlockedWriteClient ( )
2020-03-20 16:34:15 +00:00
m := NewQueueManager ( metrics , nil , nil , logger , dir ,
2019-02-20 04:03:41 +00:00
newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
2022-10-05 20:14:49 +00:00
cfg , mcfg , labels . EmptyLabels ( ) , nil , c , 1 * time . Minute , newPool ( ) , newHighestTimestampMetric ( ) , nil , false , false )
2019-11-27 00:53:11 +00:00
m . watcher . SetStartTime ( timestamp . Time ( math . MaxInt64 ) )
2019-09-19 09:15:41 +00:00
m . watcher . MaxSegment = segments [ len ( segments ) - 2 ]
err := m . watcher . Run ( )
2020-10-29 09:43:23 +00:00
require . NoError ( b , err )
2019-02-20 04:03:41 +00:00
}
}
2019-03-08 16:29:25 +00:00
func TestProcessExternalLabels ( t * testing . T ) {
2024-01-29 18:49:55 +00:00
b := labels . NewBuilder ( labels . EmptyLabels ( ) )
for i , tc := range [ ] struct {
2019-11-18 19:53:33 +00:00
labels labels . Labels
2022-03-09 22:35:14 +00:00
externalLabels [ ] labels . Label
2019-03-08 16:29:25 +00:00
expected labels . Labels
} {
// Test adding labels at the end.
{
2022-03-09 22:35:14 +00:00
labels : labels . FromStrings ( "a" , "b" ) ,
externalLabels : [ ] labels . Label { { Name : "c" , Value : "d" } } ,
expected : labels . FromStrings ( "a" , "b" , "c" , "d" ) ,
2019-03-08 16:29:25 +00:00
} ,
// Test adding labels at the beginning.
{
2022-03-09 22:35:14 +00:00
labels : labels . FromStrings ( "c" , "d" ) ,
externalLabels : [ ] labels . Label { { Name : "a" , Value : "b" } } ,
expected : labels . FromStrings ( "a" , "b" , "c" , "d" ) ,
2019-03-08 16:29:25 +00:00
} ,
// Test we don't override existing labels.
{
2022-03-09 22:35:14 +00:00
labels : labels . FromStrings ( "a" , "b" ) ,
externalLabels : [ ] labels . Label { { Name : "a" , Value : "c" } } ,
expected : labels . FromStrings ( "a" , "b" ) ,
2019-03-08 16:29:25 +00:00
} ,
2021-04-30 16:37:07 +00:00
// Test empty externalLabels.
{
2022-03-09 22:35:14 +00:00
labels : labels . FromStrings ( "a" , "b" ) ,
externalLabels : [ ] labels . Label { } ,
expected : labels . FromStrings ( "a" , "b" ) ,
2021-04-30 16:37:07 +00:00
} ,
// Test empty labels.
{
2022-03-09 22:35:14 +00:00
labels : labels . EmptyLabels ( ) ,
externalLabels : [ ] labels . Label { { Name : "a" , Value : "b" } } ,
expected : labels . FromStrings ( "a" , "b" ) ,
2021-04-30 16:37:07 +00:00
} ,
// Test labels is longer than externalLabels.
{
2022-03-09 22:35:14 +00:00
labels : labels . FromStrings ( "a" , "b" , "c" , "d" ) ,
externalLabels : [ ] labels . Label { { Name : "e" , Value : "f" } } ,
expected : labels . FromStrings ( "a" , "b" , "c" , "d" , "e" , "f" ) ,
2021-04-30 16:37:07 +00:00
} ,
// Test externalLabels is longer than labels.
{
2022-03-09 22:35:14 +00:00
labels : labels . FromStrings ( "c" , "d" ) ,
externalLabels : [ ] labels . Label { { Name : "a" , Value : "b" } , { Name : "e" , Value : "f" } } ,
expected : labels . FromStrings ( "a" , "b" , "c" , "d" , "e" , "f" ) ,
} ,
// Adding with and without clashing labels.
{
labels : labels . FromStrings ( "a" , "b" , "c" , "d" ) ,
externalLabels : [ ] labels . Label { { Name : "a" , Value : "xxx" } , { Name : "c" , Value : "yyy" } , { Name : "e" , Value : "f" } } ,
expected : labels . FromStrings ( "a" , "b" , "c" , "d" , "e" , "f" ) ,
2021-04-30 16:37:07 +00:00
} ,
2019-03-08 16:29:25 +00:00
} {
2024-01-29 18:49:55 +00:00
b . Reset ( tc . labels )
processExternalLabels ( b , tc . externalLabels )
2024-01-24 16:48:22 +00:00
testutil . RequireEqual ( t , tc . expected , b . Labels ( ) , "test %d" , i )
2019-03-08 16:29:25 +00:00
}
}
2019-12-24 15:59:50 +00:00
func TestCalculateDesiredShards ( t * testing . T ) {
cfg := config . DefaultQueueConfig
2023-03-27 15:32:25 +00:00
_ , m := newTestClientAndQueueManager ( t , defaultFlushDeadline )
samplesIn := m . dataIn
2019-12-24 15:59:50 +00:00
// Need to start the queue manager so the proper metrics are initialized.
// However we can stop it right away since we don't need to do any actual
// processing.
m . Start ( )
m . Stop ( )
inputRate := int64 ( 50000 )
var pendingSamples int64
// Two minute startup, no samples are sent.
startedAt := time . Now ( ) . Add ( - 2 * time . Minute )
// helper function for adding samples.
addSamples := func ( s int64 , ts time . Duration ) {
pendingSamples += s
samplesIn . incr ( s )
samplesIn . tick ( )
2020-09-24 18:44:18 +00:00
m . highestRecvTimestamp . Set ( float64 ( startedAt . Add ( ts ) . Unix ( ) ) )
2019-12-24 15:59:50 +00:00
}
// helper function for sending samples.
sendSamples := func ( s int64 , ts time . Duration ) {
pendingSamples -= s
2021-05-06 20:53:52 +00:00
m . dataOut . incr ( s )
m . dataOutDuration . incr ( int64 ( m . numShards ) * int64 ( shardUpdateDuration ) )
2019-12-24 15:59:50 +00:00
// highest sent is how far back pending samples would be at our input rate.
highestSent := startedAt . Add ( ts - time . Duration ( pendingSamples / inputRate ) * time . Second )
2020-04-25 03:39:46 +00:00
m . metrics . highestSentTimestamp . Set ( float64 ( highestSent . Unix ( ) ) )
2019-12-24 15:59:50 +00:00
2020-07-30 07:45:42 +00:00
m . lastSendTimestamp . Store ( time . Now ( ) . Unix ( ) )
2019-12-24 15:59:50 +00:00
}
ts := time . Duration ( 0 )
for ; ts < 120 * time . Second ; ts += shardUpdateDuration {
addSamples ( inputRate * int64 ( shardUpdateDuration / time . Second ) , ts )
m . numShards = m . calculateDesiredShards ( )
2020-10-29 09:43:23 +00:00
require . Equal ( t , 1 , m . numShards )
2019-12-24 15:59:50 +00:00
}
// Assume 100ms per request, or 10 requests per second per shard.
// Shard calculation should never drop below barely keeping up.
minShards := int ( inputRate ) / cfg . MaxSamplesPerSend / 10
// This test should never go above 200 shards, that would be more resources than needed.
maxShards := 200
for ; ts < 15 * time . Minute ; ts += shardUpdateDuration {
sin := inputRate * int64 ( shardUpdateDuration / time . Second )
addSamples ( sin , ts )
sout := int64 ( m . numShards * cfg . MaxSamplesPerSend ) * int64 ( shardUpdateDuration / ( 100 * time . Millisecond ) )
// You can't send samples that don't exist so cap at the number of pending samples.
if sout > pendingSamples {
sout = pendingSamples
}
sendSamples ( sout , ts )
t . Log ( "desiredShards" , m . numShards , "pendingSamples" , pendingSamples )
m . numShards = m . calculateDesiredShards ( )
2020-10-29 09:43:23 +00:00
require . GreaterOrEqual ( t , m . numShards , minShards , "Shards are too low. desiredShards=%d, minShards=%d, t_seconds=%d" , m . numShards , minShards , ts / time . Second )
require . LessOrEqual ( t , m . numShards , maxShards , "Shards are too high. desiredShards=%d, maxShards=%d, t_seconds=%d" , m . numShards , maxShards , ts / time . Second )
2019-12-24 15:59:50 +00:00
}
2020-10-29 09:43:23 +00:00
require . Equal ( t , int64 ( 0 ) , pendingSamples , "Remote write never caught up, there are still %d pending samples." , pendingSamples )
2019-12-24 15:59:50 +00:00
}
2020-04-30 22:54:02 +00:00
2021-08-29 16:55:45 +00:00
func TestCalculateDesiredShardsDetail ( t * testing . T ) {
2023-03-27 15:32:25 +00:00
_ , m := newTestClientAndQueueManager ( t , defaultFlushDeadline )
samplesIn := m . dataIn
2021-08-29 16:55:45 +00:00
for _ , tc := range [ ] struct {
name string
prevShards int
dataIn int64 // Quantities normalised to seconds.
dataOut int64
dataDropped int64
dataOutDuration float64
backlog float64
expectedShards int
} {
{
name : "nothing in or out 1" ,
prevShards : 1 ,
expectedShards : 1 , // Shards stays the same.
} ,
{
name : "nothing in or out 10" ,
prevShards : 10 ,
expectedShards : 10 , // Shards stays the same.
} ,
{
name : "steady throughput" ,
prevShards : 1 ,
dataIn : 10 ,
dataOut : 10 ,
dataOutDuration : 1 ,
expectedShards : 1 ,
} ,
{
name : "scale down" ,
prevShards : 10 ,
dataIn : 10 ,
dataOut : 10 ,
dataOutDuration : 5 ,
expectedShards : 5 ,
} ,
{
name : "scale down constrained" ,
prevShards : 7 ,
dataIn : 10 ,
dataOut : 10 ,
dataOutDuration : 5 ,
expectedShards : 7 ,
} ,
{
name : "scale up" ,
prevShards : 1 ,
dataIn : 10 ,
dataOut : 10 ,
dataOutDuration : 10 ,
expectedShards : 10 ,
} ,
{
name : "scale up constrained" ,
prevShards : 8 ,
dataIn : 10 ,
dataOut : 10 ,
dataOutDuration : 10 ,
expectedShards : 8 ,
} ,
{
name : "backlogged 20s" ,
prevShards : 2 ,
dataIn : 10 ,
dataOut : 10 ,
dataOutDuration : 2 ,
backlog : 20 ,
2021-08-29 17:11:13 +00:00
expectedShards : 4 ,
2021-08-29 16:55:45 +00:00
} ,
{
name : "backlogged 90s" ,
2021-08-29 17:11:13 +00:00
prevShards : 4 ,
2021-08-29 16:55:45 +00:00
dataIn : 10 ,
dataOut : 10 ,
2021-08-29 17:11:13 +00:00
dataOutDuration : 4 ,
2021-08-29 16:55:45 +00:00
backlog : 90 ,
2021-08-29 17:11:13 +00:00
expectedShards : 22 ,
2021-08-29 16:55:45 +00:00
} ,
{
name : "backlog reduced" ,
2021-08-29 17:11:13 +00:00
prevShards : 22 ,
2021-08-29 16:55:45 +00:00
dataIn : 10 ,
dataOut : 20 ,
dataOutDuration : 4 ,
backlog : 10 ,
2021-08-29 17:11:13 +00:00
expectedShards : 3 ,
2021-08-29 16:55:45 +00:00
} ,
{
name : "backlog eliminated" ,
prevShards : 3 ,
dataIn : 10 ,
dataOut : 10 ,
dataOutDuration : 2 ,
backlog : 0 ,
expectedShards : 2 , // Shard back down.
} ,
{
name : "slight slowdown" ,
prevShards : 1 ,
dataIn : 10 ,
dataOut : 10 ,
dataOutDuration : 1.2 ,
2021-08-30 12:39:16 +00:00
expectedShards : 2 , // 1.2 is rounded up to 2.
2021-08-29 16:55:45 +00:00
} ,
{
name : "bigger slowdown" ,
prevShards : 1 ,
dataIn : 10 ,
dataOut : 10 ,
dataOutDuration : 1.4 ,
2021-08-30 12:39:16 +00:00
expectedShards : 2 ,
2021-08-29 16:55:45 +00:00
} ,
{
name : "speed up" ,
prevShards : 2 ,
dataIn : 10 ,
dataOut : 10 ,
dataOutDuration : 1.2 ,
backlog : 0 ,
expectedShards : 2 , // No reaction - 1.2 is rounded up to 2.
} ,
{
name : "speed up more" ,
prevShards : 2 ,
dataIn : 10 ,
dataOut : 10 ,
dataOutDuration : 0.9 ,
backlog : 0 ,
expectedShards : 1 ,
} ,
{
name : "marginal decision A" ,
prevShards : 3 ,
dataIn : 10 ,
dataOut : 10 ,
dataOutDuration : 2.01 ,
backlog : 0 ,
expectedShards : 3 , // 2.01 rounds up to 3.
} ,
{
name : "marginal decision B" ,
prevShards : 3 ,
dataIn : 10 ,
dataOut : 10 ,
dataOutDuration : 1.99 ,
backlog : 0 ,
expectedShards : 2 , // 1.99 rounds up to 2.
} ,
} {
t . Run ( tc . name , func ( t * testing . T ) {
m . numShards = tc . prevShards
forceEMWA ( samplesIn , tc . dataIn * int64 ( shardUpdateDuration / time . Second ) )
samplesIn . tick ( )
forceEMWA ( m . dataOut , tc . dataOut * int64 ( shardUpdateDuration / time . Second ) )
forceEMWA ( m . dataDropped , tc . dataDropped * int64 ( shardUpdateDuration / time . Second ) )
forceEMWA ( m . dataOutDuration , int64 ( tc . dataOutDuration * float64 ( shardUpdateDuration ) ) )
m . highestRecvTimestamp . value = tc . backlog // Not Set() because it can only increase value.
require . Equal ( t , tc . expectedShards , m . calculateDesiredShards ( ) )
} )
}
}
func forceEMWA ( r * ewmaRate , rate int64 ) {
r . init = false
r . newEvents . Store ( rate )
}
2020-04-30 22:54:02 +00:00
func TestQueueManagerMetrics ( t * testing . T ) {
reg := prometheus . NewPedanticRegistry ( )
metrics := newQueueManagerMetrics ( reg , "name" , "http://localhost:1234" )
// Make sure metrics pass linting.
problems , err := client_testutil . GatherAndLint ( reg )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2023-12-07 11:35:01 +00:00
require . Empty ( t , problems , "Metric linting problems detected: %v" , problems )
2020-04-30 22:54:02 +00:00
// Make sure all metrics were unregistered. A failure here means you need
// unregister a metric in `queueManagerMetrics.unregister()`.
metrics . unregister ( )
err = client_testutil . GatherAndCompare ( reg , strings . NewReader ( "" ) )
2020-10-29 09:43:23 +00:00
require . NoError ( t , err )
2020-04-30 22:54:02 +00:00
}
2022-04-20 18:50:41 +00:00
func TestQueue_FlushAndShutdownDoesNotDeadlock ( t * testing . T ) {
capacity := 100
batchSize := 10
queue := newQueue ( batchSize , capacity )
for i := 0 ; i < capacity + batchSize ; i ++ {
2022-07-14 13:13:12 +00:00
queue . Append ( timeSeries { } )
2022-04-20 18:50:41 +00:00
}
done := make ( chan struct { } )
go queue . FlushAndShutdown ( done )
go func ( ) {
// Give enough time for FlushAndShutdown to acquire the lock. queue.Batch()
// should not block forever even if the lock is acquired.
time . Sleep ( 10 * time . Millisecond )
queue . Batch ( )
close ( done )
} ( )
select {
case <- done :
case <- time . After ( 2 * time . Second ) :
t . Error ( "Deadlock in FlushAndShutdown detected" )
pprof . Lookup ( "goroutine" ) . WriteTo ( os . Stdout , 1 )
t . FailNow ( )
}
}
2024-01-05 18:40:30 +00:00
func TestDropOldTimeSeries ( t * testing . T ) {
size := 10
nSeries := 6
nSamples := config . DefaultQueueConfig . Capacity * size
samples , newSamples , series := createTimeseriesWithOldSamples ( nSamples , nSeries )
c := NewTestWriteClient ( )
c . expectSamples ( newSamples , series )
cfg := config . DefaultQueueConfig
mcfg := config . DefaultMetadataConfig
cfg . MaxShards = 1
cfg . SampleAgeLimit = model . Duration ( 60 * time . Second )
2023-03-27 15:32:25 +00:00
m := newTestQueueManager ( t , cfg , mcfg , defaultFlushDeadline , c )
2024-01-05 18:40:30 +00:00
m . StoreSeries ( series , 0 )
m . Start ( )
defer m . Stop ( )
m . Append ( samples )
c . waitForExpectedData ( t )
}
func TestIsSampleOld ( t * testing . T ) {
currentTime := time . Now ( )
require . True ( t , isSampleOld ( currentTime , 60 * time . Second , timestamp . FromTime ( currentTime . Add ( - 61 * time . Second ) ) ) )
require . False ( t , isSampleOld ( currentTime , 60 * time . Second , timestamp . FromTime ( currentTime . Add ( - 59 * time . Second ) ) ) )
}
2024-06-21 21:19:58 +00:00
// Simulates scenario in which remote write endpoint is down and a subset of samples is dropped due to age limit while backoffing.
func TestSendSamplesWithBackoffWithSampleAgeLimit ( t * testing . T ) {
maxSamplesPerSend := 10
sampleAgeLimit := time . Second
cfg := config . DefaultQueueConfig
cfg . MaxShards = 1
cfg . SampleAgeLimit = model . Duration ( sampleAgeLimit )
// Set the batch send deadline to 5 minutes to effectively disable it.
cfg . BatchSendDeadline = model . Duration ( time . Minute * 5 )
cfg . Capacity = 10 * maxSamplesPerSend // more than the amount of data we append in the test
cfg . MaxBackoff = model . Duration ( time . Millisecond * 100 )
cfg . MinBackoff = model . Duration ( time . Millisecond * 100 )
cfg . MaxSamplesPerSend = maxSamplesPerSend
metadataCfg := config . DefaultMetadataConfig
metadataCfg . Send = true
metadataCfg . SendInterval = model . Duration ( time . Second * 60 )
metadataCfg . MaxSamplesPerSend = maxSamplesPerSend
c := NewTestWriteClient ( )
c . withWaitGroup = false
m := newTestQueueManager ( t , cfg , metadataCfg , time . Second , c )
m . Start ( )
batchID := 0
expectedSamples := map [ string ] [ ] prompb . Sample { }
appendData := func ( numberOfSeries int , timeAdd time . Duration , shouldBeDropped bool ) {
t . Log ( ">>>> Appending series " , numberOfSeries , " as batch ID " , batchID , " with timeAdd " , timeAdd , " and should be dropped " , shouldBeDropped )
samples , series := createTimeseriesWithRandomLabelCount ( strconv . Itoa ( batchID ) , numberOfSeries , timeAdd , 9 )
m . StoreSeries ( series , batchID )
sent := m . Append ( samples )
require . True ( t , sent , "samples not sent" )
if ! shouldBeDropped {
for _ , s := range samples {
tsID := getSeriesIDFromRef ( series [ s . Ref ] )
expectedSamples [ tsID ] = append ( c . expectedSamples [ tsID ] , prompb . Sample {
Timestamp : s . T ,
Value : s . V ,
} )
}
}
batchID ++
}
timeShift := - time . Millisecond * 5
c . SetReturnError ( RecoverableError { context . DeadlineExceeded , defaultBackoff } )
appendData ( maxSamplesPerSend / 2 , timeShift , true )
time . Sleep ( sampleAgeLimit )
appendData ( maxSamplesPerSend / 2 , timeShift , true )
time . Sleep ( sampleAgeLimit / 10 )
appendData ( maxSamplesPerSend / 2 , timeShift , true )
time . Sleep ( 2 * sampleAgeLimit )
appendData ( 2 * maxSamplesPerSend , timeShift , false )
time . Sleep ( sampleAgeLimit / 2 )
c . SetReturnError ( nil )
appendData ( 5 , timeShift , false )
m . Stop ( )
if diff := cmp . Diff ( expectedSamples , c . receivedSamples ) ; diff != "" {
t . Errorf ( "mismatch (-want +got):\n%s" , diff )
}
}
func createTimeseriesWithRandomLabelCount ( id string , seriesCount int , timeAdd time . Duration , maxLabels int ) ( [ ] record . RefSample , [ ] record . RefSeries ) {
samples := [ ] record . RefSample { }
series := [ ] record . RefSeries { }
// use a fixed rand source so tests are consistent
r := rand . New ( rand . NewSource ( 99 ) )
for i := 0 ; i < seriesCount ; i ++ {
s := record . RefSample {
Ref : chunks . HeadSeriesRef ( i ) ,
T : time . Now ( ) . Add ( timeAdd ) . UnixMilli ( ) ,
V : r . Float64 ( ) ,
}
samples = append ( samples , s )
labelsCount := r . Intn ( maxLabels )
lb := labels . NewScratchBuilder ( 1 + labelsCount )
lb . Add ( "__name__" , "batch_" + id + "_id_" + strconv . Itoa ( i ) )
for j := 1 ; j < labelsCount + 1 ; j ++ {
// same for both name and value
label := "batch_" + id + "_label_" + strconv . Itoa ( j )
lb . Add ( label , label )
}
series = append ( series , record . RefSeries {
Ref : chunks . HeadSeriesRef ( i ) ,
Labels : lb . Labels ( ) ,
} )
}
return samples , series
}
2024-01-05 18:40:30 +00:00
func createTimeseriesWithOldSamples ( numSamples , numSeries int , extraLabels ... labels . Label ) ( [ ] record . RefSample , [ ] record . RefSample , [ ] record . RefSeries ) {
newSamples := make ( [ ] record . RefSample , 0 , numSamples )
samples := make ( [ ] record . RefSample , 0 , numSamples )
series := make ( [ ] record . RefSeries , 0 , numSeries )
2023-03-27 15:27:35 +00:00
lb := labels . NewScratchBuilder ( 1 + len ( extraLabels ) )
2024-01-05 18:40:30 +00:00
for i := 0 ; i < numSeries ; i ++ {
name := fmt . Sprintf ( "test_metric_%d" , i )
// We create half of the samples in the past.
past := timestamp . FromTime ( time . Now ( ) . Add ( - 5 * time . Minute ) )
for j := 0 ; j < numSamples / 2 ; j ++ {
samples = append ( samples , record . RefSample {
Ref : chunks . HeadSeriesRef ( i ) ,
T : past + int64 ( j ) ,
V : float64 ( i ) ,
} )
}
for j := 0 ; j < numSamples / 2 ; j ++ {
sample := record . RefSample {
Ref : chunks . HeadSeriesRef ( i ) ,
T : int64 ( int ( time . Now ( ) . UnixMilli ( ) ) + j ) ,
V : float64 ( i ) ,
}
samples = append ( samples , sample )
newSamples = append ( newSamples , sample )
}
// Create Labels that is name of series plus any extra labels supplied.
lb . Reset ( )
lb . Add ( labels . MetricName , name )
for _ , l := range extraLabels {
lb . Add ( l . Name , l . Value )
}
lb . Sort ( )
series = append ( series , record . RefSeries {
Ref : chunks . HeadSeriesRef ( i ) ,
Labels : lb . Labels ( ) ,
} )
}
return samples , newSamples , series
}
func filterTsLimit ( limit int64 , ts prompb . TimeSeries ) bool {
return limit > ts . Samples [ 0 ] . Timestamp
}
func TestBuildTimeSeries ( t * testing . T ) {
testCases := [ ] struct {
name string
ts [ ] prompb . TimeSeries
filter func ( ts prompb . TimeSeries ) bool
lowestTs int64
highestTs int64
droppedSamples int
responseLen int
} {
{
name : "No filter applied" ,
ts : [ ] prompb . TimeSeries {
{
Samples : [ ] prompb . Sample {
{
Timestamp : 1234567890 ,
Value : 1.23 ,
} ,
} ,
} ,
{
Samples : [ ] prompb . Sample {
{
Timestamp : 1234567891 ,
Value : 2.34 ,
} ,
} ,
} ,
{
Samples : [ ] prompb . Sample {
{
Timestamp : 1234567892 ,
Value : 3.34 ,
} ,
} ,
} ,
} ,
filter : nil ,
responseLen : 3 ,
lowestTs : 1234567890 ,
highestTs : 1234567892 ,
} ,
{
name : "Filter applied, samples in order" ,
ts : [ ] prompb . TimeSeries {
{
Samples : [ ] prompb . Sample {
{
Timestamp : 1234567890 ,
Value : 1.23 ,
} ,
} ,
} ,
{
Samples : [ ] prompb . Sample {
{
Timestamp : 1234567891 ,
Value : 2.34 ,
} ,
} ,
} ,
{
Samples : [ ] prompb . Sample {
{
Timestamp : 1234567892 ,
Value : 3.45 ,
} ,
} ,
} ,
{
Samples : [ ] prompb . Sample {
{
Timestamp : 1234567893 ,
Value : 3.45 ,
} ,
} ,
} ,
} ,
filter : func ( ts prompb . TimeSeries ) bool { return filterTsLimit ( 1234567892 , ts ) } ,
responseLen : 2 ,
lowestTs : 1234567892 ,
highestTs : 1234567893 ,
droppedSamples : 2 ,
} ,
{
name : "Filter applied, samples out of order" ,
ts : [ ] prompb . TimeSeries {
{
Samples : [ ] prompb . Sample {
{
Timestamp : 1234567892 ,
Value : 3.45 ,
} ,
} ,
} ,
{
Samples : [ ] prompb . Sample {
{
Timestamp : 1234567890 ,
Value : 1.23 ,
} ,
} ,
} ,
{
Samples : [ ] prompb . Sample {
{
Timestamp : 1234567893 ,
Value : 3.45 ,
} ,
} ,
} ,
{
Samples : [ ] prompb . Sample {
{
Timestamp : 1234567891 ,
Value : 2.34 ,
} ,
} ,
} ,
} ,
filter : func ( ts prompb . TimeSeries ) bool { return filterTsLimit ( 1234567892 , ts ) } ,
responseLen : 2 ,
lowestTs : 1234567892 ,
highestTs : 1234567893 ,
droppedSamples : 2 ,
} ,
{
name : "Filter applied, samples not consecutive" ,
ts : [ ] prompb . TimeSeries {
{
Samples : [ ] prompb . Sample {
{
Timestamp : 1234567890 ,
Value : 1.23 ,
} ,
} ,
} ,
{
Samples : [ ] prompb . Sample {
{
Timestamp : 1234567892 ,
Value : 3.45 ,
} ,
} ,
} ,
{
Samples : [ ] prompb . Sample {
{
Timestamp : 1234567895 ,
Value : 6.78 ,
} ,
} ,
} ,
{
Samples : [ ] prompb . Sample {
{
Timestamp : 1234567897 ,
Value : 6.78 ,
} ,
} ,
} ,
} ,
filter : func ( ts prompb . TimeSeries ) bool { return filterTsLimit ( 1234567895 , ts ) } ,
responseLen : 2 ,
lowestTs : 1234567895 ,
highestTs : 1234567897 ,
droppedSamples : 2 ,
} ,
}
// Run the test cases
for _ , tc := range testCases {
t . Run ( tc . name , func ( t * testing . T ) {
highest , lowest , result , droppedSamples , _ , _ := buildTimeSeries ( tc . ts , tc . filter )
require . NotNil ( t , result )
require . Len ( t , result , tc . responseLen )
require . Equal ( t , tc . highestTs , highest )
require . Equal ( t , tc . lowestTs , lowest )
require . Equal ( t , tc . droppedSamples , droppedSamples )
} )
}
}
2024-06-21 21:19:58 +00:00
func BenchmarkBuildTimeSeries ( b * testing . B ) {
// Send one sample per series, which is the typical remote_write case
const numSamples = 10000
filter := func ( ts prompb . TimeSeries ) bool { return filterTsLimit ( 99 , ts ) }
for i := 0 ; i < b . N ; i ++ {
samples := createProtoTimeseriesWithOld ( numSamples , 100 , extraLabels ... )
_ , _ , result , _ , _ , _ := buildTimeSeries ( samples , filter )
require . NotNil ( b , result )
}
}