mirror of https://github.com/prometheus/prometheus
2295 lines
71 KiB
Go
2295 lines
71 KiB
Go
// Copyright 2013 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package remote
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"math/rand"
|
|
"os"
|
|
"runtime/pprof"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/golang/snappy"
|
|
"github.com/google/go-cmp/cmp"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/common/promslog"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/atomic"
|
|
|
|
"github.com/prometheus/prometheus/config"
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/relabel"
|
|
"github.com/prometheus/prometheus/model/timestamp"
|
|
"github.com/prometheus/prometheus/prompb"
|
|
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
|
|
"github.com/prometheus/prometheus/scrape"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
"github.com/prometheus/prometheus/tsdb/record"
|
|
"github.com/prometheus/prometheus/util/runutil"
|
|
"github.com/prometheus/prometheus/util/testutil"
|
|
)
|
|
|
|
const defaultFlushDeadline = 1 * time.Minute
|
|
|
|
func newHighestTimestampMetric() *maxTimestamp {
|
|
return &maxTimestamp{
|
|
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Namespace: namespace,
|
|
Subsystem: subsystem,
|
|
Name: "highest_timestamp_in_seconds",
|
|
Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch. Initialized to 0 when no data has been received yet",
|
|
}),
|
|
}
|
|
}
|
|
|
|
func TestBasicContentNegotiation(t *testing.T) {
|
|
queueConfig := config.DefaultQueueConfig
|
|
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond)
|
|
queueConfig.MaxShards = 1
|
|
|
|
// We need to set URL's so that metric creation doesn't panic.
|
|
writeConfig := baseRemoteWriteConfig("http://test-storage.com")
|
|
writeConfig.QueueConfig = queueConfig
|
|
|
|
conf := &config.Config{
|
|
GlobalConfig: config.DefaultGlobalConfig,
|
|
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
|
writeConfig,
|
|
},
|
|
}
|
|
|
|
for _, tc := range []struct {
|
|
name string
|
|
senderProtoMsg config.RemoteWriteProtoMsg
|
|
receiverProtoMsg config.RemoteWriteProtoMsg
|
|
injectErrs []error
|
|
expectFail bool
|
|
}{
|
|
{
|
|
name: "v2 happy path",
|
|
senderProtoMsg: config.RemoteWriteProtoMsgV2, receiverProtoMsg: config.RemoteWriteProtoMsgV2,
|
|
injectErrs: []error{nil},
|
|
},
|
|
{
|
|
name: "v1 happy path",
|
|
senderProtoMsg: config.RemoteWriteProtoMsgV1, receiverProtoMsg: config.RemoteWriteProtoMsgV1,
|
|
injectErrs: []error{nil},
|
|
},
|
|
// Test a case where the v1 request has a temporary delay but goes through on retry.
|
|
{
|
|
name: "v1 happy path with one 5xx retry",
|
|
senderProtoMsg: config.RemoteWriteProtoMsgV1, receiverProtoMsg: config.RemoteWriteProtoMsgV1,
|
|
injectErrs: []error{RecoverableError{errors.New("pretend 500"), 1}, nil},
|
|
},
|
|
// Repeat the above test but with v2. The request has a temporary delay but goes through on retry.
|
|
{
|
|
name: "v2 happy path with one 5xx retry",
|
|
senderProtoMsg: config.RemoteWriteProtoMsgV2, receiverProtoMsg: config.RemoteWriteProtoMsgV2,
|
|
injectErrs: []error{RecoverableError{errors.New("pretend 500"), 1}, nil},
|
|
},
|
|
// A few error cases of v2 talking to v1.
|
|
{
|
|
name: "v2 talks to v1 that gives 400 or 415",
|
|
senderProtoMsg: config.RemoteWriteProtoMsgV2, receiverProtoMsg: config.RemoteWriteProtoMsgV1,
|
|
injectErrs: []error{errors.New("pretend unrecoverable err")},
|
|
expectFail: true,
|
|
},
|
|
{
|
|
name: "v2 talks to (broken) v1 that tries to unmarshal v2 payload with v1 proto",
|
|
senderProtoMsg: config.RemoteWriteProtoMsgV2, receiverProtoMsg: config.RemoteWriteProtoMsgV1,
|
|
injectErrs: []error{nil},
|
|
expectFail: true, // We detect this thanks to https://github.com/prometheus/prometheus/issues/14359
|
|
},
|
|
// Opposite, v1 talking to v2 only server.
|
|
{
|
|
name: "v1 talks to v2 that gives 400 or 415",
|
|
senderProtoMsg: config.RemoteWriteProtoMsgV1, receiverProtoMsg: config.RemoteWriteProtoMsgV2,
|
|
injectErrs: []error{errors.New("pretend unrecoverable err")},
|
|
expectFail: true,
|
|
},
|
|
} {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
dir := t.TempDir()
|
|
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
|
|
defer s.Close()
|
|
|
|
var (
|
|
series []record.RefSeries
|
|
metadata []record.RefMetadata
|
|
samples []record.RefSample
|
|
)
|
|
|
|
// Generates same series in both cases.
|
|
samples, series = createTimeseries(1, 1)
|
|
metadata = createSeriesMetadata(series)
|
|
|
|
// Apply new config.
|
|
queueConfig.Capacity = len(samples)
|
|
queueConfig.MaxSamplesPerSend = len(samples)
|
|
// For now we only ever have a single rw config in this test.
|
|
conf.RemoteWriteConfigs[0].ProtobufMessage = tc.senderProtoMsg
|
|
require.NoError(t, s.ApplyConfig(conf))
|
|
hash, err := toHash(writeConfig)
|
|
require.NoError(t, err)
|
|
qm := s.rws.queues[hash]
|
|
|
|
c := NewTestWriteClient(tc.receiverProtoMsg)
|
|
c.injectErrors(tc.injectErrs)
|
|
qm.SetClient(c)
|
|
|
|
qm.StoreSeries(series, 0)
|
|
qm.StoreMetadata(metadata)
|
|
|
|
// Do we expect some data back?
|
|
if !tc.expectFail {
|
|
c.expectSamples(samples, series)
|
|
} else {
|
|
c.expectSamples(nil, nil)
|
|
}
|
|
|
|
// Schedule send.
|
|
qm.Append(samples)
|
|
|
|
if !tc.expectFail {
|
|
// No error expected, so wait for data.
|
|
c.waitForExpectedData(t, 5*time.Second)
|
|
require.Equal(t, 0.0, client_testutil.ToFloat64(qm.metrics.failedSamplesTotal))
|
|
} else {
|
|
// Wait for failure to be recorded in metrics.
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
require.NoError(t, runutil.Retry(500*time.Millisecond, ctx.Done(), func() error {
|
|
if client_testutil.ToFloat64(qm.metrics.failedSamplesTotal) != 1.0 {
|
|
return fmt.Errorf("expected one sample failed in qm metrics; got %v", client_testutil.ToFloat64(qm.metrics.failedSamplesTotal))
|
|
}
|
|
return nil
|
|
}))
|
|
}
|
|
|
|
// samplesTotal means attempts.
|
|
require.Equal(t, float64(len(tc.injectErrs)), client_testutil.ToFloat64(qm.metrics.samplesTotal))
|
|
require.Equal(t, float64(len(tc.injectErrs)-1), client_testutil.ToFloat64(qm.metrics.retriedSamplesTotal))
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSampleDelivery(t *testing.T) {
|
|
// Let's create an even number of send batches, so we don't run into the
|
|
// batch timeout case.
|
|
n := 3
|
|
|
|
queueConfig := config.DefaultQueueConfig
|
|
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond)
|
|
queueConfig.MaxShards = 1
|
|
|
|
// We need to set URL's so that metric creation doesn't panic.
|
|
writeConfig := baseRemoteWriteConfig("http://test-storage.com")
|
|
writeConfig.QueueConfig = queueConfig
|
|
writeConfig.SendExemplars = true
|
|
writeConfig.SendNativeHistograms = true
|
|
|
|
conf := &config.Config{
|
|
GlobalConfig: config.DefaultGlobalConfig,
|
|
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
|
writeConfig,
|
|
},
|
|
}
|
|
for _, tc := range []struct {
|
|
protoMsg config.RemoteWriteProtoMsg
|
|
|
|
name string
|
|
samples bool
|
|
exemplars bool
|
|
histograms bool
|
|
floatHistograms bool
|
|
}{
|
|
{protoMsg: config.RemoteWriteProtoMsgV1, samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"},
|
|
{protoMsg: config.RemoteWriteProtoMsgV1, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"},
|
|
{protoMsg: config.RemoteWriteProtoMsgV1, samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"},
|
|
{protoMsg: config.RemoteWriteProtoMsgV1, samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
|
|
{protoMsg: config.RemoteWriteProtoMsgV1, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
|
|
|
|
// TODO(alexg): update some portion of this test to check for the 2.0 metadata
|
|
{protoMsg: config.RemoteWriteProtoMsgV2, samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"},
|
|
{protoMsg: config.RemoteWriteProtoMsgV2, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"},
|
|
{protoMsg: config.RemoteWriteProtoMsgV2, samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"},
|
|
{protoMsg: config.RemoteWriteProtoMsgV2, samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
|
|
{protoMsg: config.RemoteWriteProtoMsgV2, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
|
|
} {
|
|
t.Run(fmt.Sprintf("%s-%s", tc.protoMsg, tc.name), func(t *testing.T) {
|
|
dir := t.TempDir()
|
|
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
|
|
defer s.Close()
|
|
|
|
var (
|
|
series []record.RefSeries
|
|
metadata []record.RefMetadata
|
|
samples []record.RefSample
|
|
exemplars []record.RefExemplar
|
|
histograms []record.RefHistogramSample
|
|
floatHistograms []record.RefFloatHistogramSample
|
|
)
|
|
|
|
// Generates same series in both cases.
|
|
if tc.samples {
|
|
samples, series = createTimeseries(n, n)
|
|
}
|
|
if tc.exemplars {
|
|
exemplars, series = createExemplars(n, n)
|
|
}
|
|
if tc.histograms {
|
|
histograms, _, series = createHistograms(n, n, false)
|
|
}
|
|
if tc.floatHistograms {
|
|
_, floatHistograms, series = createHistograms(n, n, true)
|
|
}
|
|
metadata = createSeriesMetadata(series)
|
|
|
|
// Apply new config.
|
|
queueConfig.Capacity = len(samples)
|
|
queueConfig.MaxSamplesPerSend = len(samples) / 2
|
|
// For now we only ever have a single rw config in this test.
|
|
conf.RemoteWriteConfigs[0].ProtobufMessage = tc.protoMsg
|
|
require.NoError(t, s.ApplyConfig(conf))
|
|
hash, err := toHash(writeConfig)
|
|
require.NoError(t, err)
|
|
qm := s.rws.queues[hash]
|
|
|
|
c := NewTestWriteClient(tc.protoMsg)
|
|
qm.SetClient(c)
|
|
|
|
qm.StoreSeries(series, 0)
|
|
qm.StoreMetadata(metadata)
|
|
|
|
// Send first half of data.
|
|
c.expectSamples(samples[:len(samples)/2], series)
|
|
c.expectExemplars(exemplars[:len(exemplars)/2], series)
|
|
c.expectHistograms(histograms[:len(histograms)/2], series)
|
|
c.expectFloatHistograms(floatHistograms[:len(floatHistograms)/2], series)
|
|
qm.Append(samples[:len(samples)/2])
|
|
qm.AppendExemplars(exemplars[:len(exemplars)/2])
|
|
qm.AppendHistograms(histograms[:len(histograms)/2])
|
|
qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2])
|
|
c.waitForExpectedData(t, 30*time.Second)
|
|
|
|
// Send second half of data.
|
|
c.expectSamples(samples[len(samples)/2:], series)
|
|
c.expectExemplars(exemplars[len(exemplars)/2:], series)
|
|
c.expectHistograms(histograms[len(histograms)/2:], series)
|
|
c.expectFloatHistograms(floatHistograms[len(floatHistograms)/2:], series)
|
|
qm.Append(samples[len(samples)/2:])
|
|
qm.AppendExemplars(exemplars[len(exemplars)/2:])
|
|
qm.AppendHistograms(histograms[len(histograms)/2:])
|
|
qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:])
|
|
c.waitForExpectedData(t, 30*time.Second)
|
|
})
|
|
}
|
|
}
|
|
|
|
func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, protoMsg config.RemoteWriteProtoMsg) (*TestWriteClient, *QueueManager) {
|
|
c := NewTestWriteClient(protoMsg)
|
|
cfg := config.DefaultQueueConfig
|
|
mcfg := config.DefaultMetadataConfig
|
|
return c, newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg)
|
|
}
|
|
|
|
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager {
|
|
dir := t.TempDir()
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg)
|
|
|
|
return m
|
|
}
|
|
|
|
func testDefaultQueueConfig() config.QueueConfig {
|
|
cfg := config.DefaultQueueConfig
|
|
// For faster unit tests we don't wait default 5 seconds.
|
|
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
|
|
return cfg
|
|
}
|
|
|
|
func TestMetadataDelivery(t *testing.T) {
|
|
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1)
|
|
m.Start()
|
|
defer m.Stop()
|
|
|
|
metadata := []scrape.MetricMetadata{}
|
|
numMetadata := 1532
|
|
for i := 0; i < numMetadata; i++ {
|
|
metadata = append(metadata, scrape.MetricMetadata{
|
|
Metric: "prometheus_remote_storage_sent_metadata_bytes_total_" + strconv.Itoa(i),
|
|
Type: model.MetricTypeCounter,
|
|
Help: "a nice help text",
|
|
Unit: "",
|
|
})
|
|
}
|
|
|
|
m.AppendWatcherMetadata(context.Background(), metadata)
|
|
|
|
require.Equal(t, 0.0, client_testutil.ToFloat64(m.metrics.failedMetadataTotal))
|
|
require.Len(t, c.receivedMetadata, numMetadata)
|
|
// One more write than the rounded quotient should be performed in order to get samples that didn't
|
|
// fit into MaxSamplesPerSend.
|
|
require.Equal(t, numMetadata/config.DefaultMetadataConfig.MaxSamplesPerSend+1, c.writesReceived)
|
|
// Make sure the last samples were sent.
|
|
require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric)
|
|
}
|
|
|
|
func TestWALMetadataDelivery(t *testing.T) {
|
|
dir := t.TempDir()
|
|
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
|
|
defer s.Close()
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
|
|
cfg.MaxShards = 1
|
|
|
|
writeConfig := baseRemoteWriteConfig("http://test-storage.com")
|
|
writeConfig.QueueConfig = cfg
|
|
writeConfig.ProtobufMessage = config.RemoteWriteProtoMsgV2
|
|
|
|
conf := &config.Config{
|
|
GlobalConfig: config.DefaultGlobalConfig,
|
|
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
|
writeConfig,
|
|
},
|
|
}
|
|
|
|
num := 3
|
|
_, series := createTimeseries(0, num)
|
|
metadata := createSeriesMetadata(series)
|
|
|
|
require.NoError(t, s.ApplyConfig(conf))
|
|
hash, err := toHash(writeConfig)
|
|
require.NoError(t, err)
|
|
qm := s.rws.queues[hash]
|
|
|
|
c := NewTestWriteClient(config.RemoteWriteProtoMsgV1)
|
|
qm.SetClient(c)
|
|
|
|
qm.StoreSeries(series, 0)
|
|
qm.StoreMetadata(metadata)
|
|
|
|
require.Len(t, qm.seriesLabels, num)
|
|
require.Len(t, qm.seriesMetadata, num)
|
|
|
|
c.waitForExpectedData(t, 30*time.Second)
|
|
}
|
|
|
|
func TestSampleDeliveryTimeout(t *testing.T) {
|
|
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
|
|
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
|
|
// Let's send one less sample than batch size, and wait the timeout duration
|
|
n := 9
|
|
samples, series := createTimeseries(n, n)
|
|
cfg := testDefaultQueueConfig()
|
|
mcfg := config.DefaultMetadataConfig
|
|
cfg.MaxShards = 1
|
|
|
|
c := NewTestWriteClient(protoMsg)
|
|
m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg)
|
|
m.StoreSeries(series, 0)
|
|
m.Start()
|
|
defer m.Stop()
|
|
|
|
// Send the samples twice, waiting for the samples in the meantime.
|
|
c.expectSamples(samples, series)
|
|
m.Append(samples)
|
|
c.waitForExpectedData(t, 30*time.Second)
|
|
|
|
c.expectSamples(samples, series)
|
|
m.Append(samples)
|
|
c.waitForExpectedData(t, 30*time.Second)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSampleDeliveryOrder(t *testing.T) {
|
|
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
|
|
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
|
|
ts := 10
|
|
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
|
|
samples := make([]record.RefSample, 0, n)
|
|
series := make([]record.RefSeries, 0, n)
|
|
for i := 0; i < n; i++ {
|
|
name := fmt.Sprintf("test_metric_%d", i%ts)
|
|
samples = append(samples, record.RefSample{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
T: int64(i),
|
|
V: float64(i),
|
|
})
|
|
series = append(series, record.RefSeries{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
Labels: labels.FromStrings("__name__", name),
|
|
})
|
|
}
|
|
|
|
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg)
|
|
c.expectSamples(samples, series)
|
|
m.StoreSeries(series, 0)
|
|
|
|
m.Start()
|
|
defer m.Stop()
|
|
// These should be received by the client.
|
|
m.Append(samples)
|
|
c.waitForExpectedData(t, 30*time.Second)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestShutdown(t *testing.T) {
|
|
deadline := 1 * time.Second
|
|
c := NewTestBlockedWriteClient()
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
mcfg := config.DefaultMetadataConfig
|
|
|
|
m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1)
|
|
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
|
|
samples, series := createTimeseries(n, n)
|
|
m.StoreSeries(series, 0)
|
|
m.Start()
|
|
|
|
// Append blocks to guarantee delivery, so we do it in the background.
|
|
go func() {
|
|
m.Append(samples)
|
|
}()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Test to ensure that Stop doesn't block.
|
|
start := time.Now()
|
|
m.Stop()
|
|
// The samples will never be delivered, so duration should
|
|
// be at least equal to deadline, otherwise the flush deadline
|
|
// was not respected.
|
|
duration := time.Since(start)
|
|
if duration > deadline+(deadline/10) {
|
|
t.Errorf("Took too long to shutdown: %s > %s", duration, deadline)
|
|
}
|
|
if duration < deadline {
|
|
t.Errorf("Shutdown occurred before flush deadline: %s < %s", duration, deadline)
|
|
}
|
|
}
|
|
|
|
func TestSeriesReset(t *testing.T) {
|
|
c := NewTestBlockedWriteClient()
|
|
deadline := 5 * time.Second
|
|
numSegments := 4
|
|
numSeries := 25
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
mcfg := config.DefaultMetadataConfig
|
|
m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1)
|
|
for i := 0; i < numSegments; i++ {
|
|
series := []record.RefSeries{}
|
|
for j := 0; j < numSeries; j++ {
|
|
series = append(series, record.RefSeries{Ref: chunks.HeadSeriesRef((i * 100) + j), Labels: labels.FromStrings("a", "a")})
|
|
}
|
|
m.StoreSeries(series, i)
|
|
}
|
|
require.Len(t, m.seriesLabels, numSegments*numSeries)
|
|
m.SeriesReset(2)
|
|
require.Len(t, m.seriesLabels, numSegments*numSeries/2)
|
|
}
|
|
|
|
func TestReshard(t *testing.T) {
|
|
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
|
|
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
|
|
size := 10 // Make bigger to find more races.
|
|
nSeries := 6
|
|
nSamples := config.DefaultQueueConfig.Capacity * size
|
|
samples, series := createTimeseries(nSamples, nSeries)
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
cfg.MaxShards = 1
|
|
|
|
c := NewTestWriteClient(protoMsg)
|
|
m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c, protoMsg)
|
|
c.expectSamples(samples, series)
|
|
m.StoreSeries(series, 0)
|
|
|
|
m.Start()
|
|
defer m.Stop()
|
|
|
|
go func() {
|
|
for i := 0; i < len(samples); i += config.DefaultQueueConfig.Capacity {
|
|
sent := m.Append(samples[i : i+config.DefaultQueueConfig.Capacity])
|
|
require.True(t, sent, "samples not sent")
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}()
|
|
|
|
for i := 1; i < len(samples)/config.DefaultQueueConfig.Capacity; i++ {
|
|
m.shards.stop()
|
|
m.shards.start(i)
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
|
|
c.waitForExpectedData(t, 30*time.Second)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestReshardRaceWithStop(t *testing.T) {
|
|
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
|
|
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
|
|
c := NewTestWriteClient(protoMsg)
|
|
var m *QueueManager
|
|
h := sync.Mutex{}
|
|
h.Lock()
|
|
|
|
cfg := testDefaultQueueConfig()
|
|
mcfg := config.DefaultMetadataConfig
|
|
exitCh := make(chan struct{})
|
|
go func() {
|
|
for {
|
|
m = newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg)
|
|
|
|
m.Start()
|
|
h.Unlock()
|
|
h.Lock()
|
|
m.Stop()
|
|
|
|
select {
|
|
case exitCh <- struct{}{}:
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
|
|
for i := 1; i < 100; i++ {
|
|
h.Lock()
|
|
m.reshardChan <- i
|
|
h.Unlock()
|
|
}
|
|
<-exitCh
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestReshardPartialBatch(t *testing.T) {
|
|
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
|
|
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
|
|
samples, series := createTimeseries(1, 10)
|
|
|
|
c := NewTestBlockedWriteClient()
|
|
|
|
cfg := testDefaultQueueConfig()
|
|
mcfg := config.DefaultMetadataConfig
|
|
cfg.MaxShards = 1
|
|
batchSendDeadline := time.Millisecond
|
|
flushDeadline := 10 * time.Millisecond
|
|
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
|
|
|
|
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg)
|
|
m.StoreSeries(series, 0)
|
|
|
|
m.Start()
|
|
|
|
for i := 0; i < 100; i++ {
|
|
done := make(chan struct{})
|
|
go func() {
|
|
m.Append(samples)
|
|
time.Sleep(batchSendDeadline)
|
|
m.shards.stop()
|
|
m.shards.start(1)
|
|
done <- struct{}{}
|
|
}()
|
|
select {
|
|
case <-done:
|
|
case <-time.After(2 * time.Second):
|
|
t.Error("Deadlock between sending and stopping detected")
|
|
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
|
t.FailNow()
|
|
}
|
|
}
|
|
// We can only call stop if there was not a deadlock.
|
|
m.Stop()
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestQueueFilledDeadlock makes sure the code does not deadlock in the case
|
|
// where a large scrape (> capacity + max samples per send) is appended at the
|
|
// same time as a batch times out according to the batch send deadline.
|
|
func TestQueueFilledDeadlock(t *testing.T) {
|
|
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
|
|
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
|
|
samples, series := createTimeseries(50, 1)
|
|
|
|
c := NewNopWriteClient()
|
|
|
|
cfg := testDefaultQueueConfig()
|
|
mcfg := config.DefaultMetadataConfig
|
|
cfg.MaxShards = 1
|
|
cfg.MaxSamplesPerSend = 10
|
|
cfg.Capacity = 20
|
|
flushDeadline := time.Second
|
|
batchSendDeadline := time.Millisecond
|
|
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
|
|
|
|
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg)
|
|
m.StoreSeries(series, 0)
|
|
m.Start()
|
|
defer m.Stop()
|
|
|
|
for i := 0; i < 100; i++ {
|
|
done := make(chan struct{})
|
|
go func() {
|
|
time.Sleep(batchSendDeadline)
|
|
m.Append(samples)
|
|
done <- struct{}{}
|
|
}()
|
|
select {
|
|
case <-done:
|
|
case <-time.After(2 * time.Second):
|
|
t.Error("Deadlock between sending and appending detected")
|
|
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
|
t.FailNow()
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestReleaseNoninternedString(t *testing.T) {
|
|
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
|
|
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
|
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg)
|
|
m.Start()
|
|
defer m.Stop()
|
|
for i := 1; i < 1000; i++ {
|
|
m.StoreSeries([]record.RefSeries{
|
|
{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
Labels: labels.FromStrings("asdf", strconv.Itoa(i)),
|
|
},
|
|
}, 0)
|
|
m.SeriesReset(1)
|
|
}
|
|
|
|
metric := client_testutil.ToFloat64(noReferenceReleases)
|
|
require.Equal(t, 0.0, metric, "expected there to be no calls to release for strings that were not already interned: %d", int(metric))
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestShouldReshard(t *testing.T) {
|
|
type testcase struct {
|
|
startingShards int
|
|
samplesIn, samplesOut, lastSendTimestamp int64
|
|
expectedToReshard bool
|
|
sendDeadline model.Duration
|
|
}
|
|
cases := []testcase{
|
|
{
|
|
// resharding shouldn't take place if we haven't successfully sent
|
|
// since the last shardUpdateDuration, even if the send deadline is very low
|
|
startingShards: 10,
|
|
samplesIn: 1000,
|
|
samplesOut: 10,
|
|
lastSendTimestamp: time.Now().Unix() - int64(shardUpdateDuration),
|
|
expectedToReshard: false,
|
|
sendDeadline: model.Duration(100 * time.Millisecond),
|
|
},
|
|
{
|
|
startingShards: 10,
|
|
samplesIn: 1000,
|
|
samplesOut: 10,
|
|
lastSendTimestamp: time.Now().Unix(),
|
|
expectedToReshard: true,
|
|
sendDeadline: config.DefaultQueueConfig.BatchSendDeadline,
|
|
},
|
|
}
|
|
|
|
for _, c := range cases {
|
|
_, m := newTestClientAndQueueManager(t, time.Duration(c.sendDeadline), config.RemoteWriteProtoMsgV1)
|
|
m.numShards = c.startingShards
|
|
m.dataIn.incr(c.samplesIn)
|
|
m.dataOut.incr(c.samplesOut)
|
|
m.lastSendTimestamp.Store(c.lastSendTimestamp)
|
|
m.Start()
|
|
|
|
desiredShards := m.calculateDesiredShards()
|
|
shouldReshard := m.shouldReshard(desiredShards)
|
|
|
|
m.Stop()
|
|
|
|
require.Equal(t, c.expectedToReshard, shouldReshard)
|
|
}
|
|
}
|
|
|
|
// TestDisableReshardOnRetry asserts that resharding should be disabled when a
|
|
// recoverable error is returned from remote_write.
|
|
func TestDisableReshardOnRetry(t *testing.T) {
|
|
onStoredContext, onStoreCalled := context.WithCancel(context.Background())
|
|
defer onStoreCalled()
|
|
|
|
var (
|
|
fakeSamples, fakeSeries = createTimeseries(100, 100)
|
|
|
|
cfg = config.DefaultQueueConfig
|
|
mcfg = config.DefaultMetadataConfig
|
|
retryAfter = time.Second
|
|
|
|
metrics = newQueueManagerMetrics(nil, "", "")
|
|
|
|
client = &MockWriteClient{
|
|
StoreFunc: func(ctx context.Context, b []byte, i int) (WriteResponseStats, error) {
|
|
onStoreCalled()
|
|
|
|
return WriteResponseStats{}, RecoverableError{
|
|
error: errors.New("fake error"),
|
|
retryAfter: model.Duration(retryAfter),
|
|
}
|
|
},
|
|
NameFunc: func() string { return "mock" },
|
|
EndpointFunc: func() string { return "http://fake:9090/api/v1/write" },
|
|
}
|
|
)
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
|
m.StoreSeries(fakeSeries, 0)
|
|
|
|
// Attempt to samples while the manager is running. We immediately stop the
|
|
// manager after the recoverable error is generated to prevent the manager
|
|
// from resharding itself.
|
|
m.Start()
|
|
{
|
|
m.Append(fakeSamples)
|
|
|
|
select {
|
|
case <-onStoredContext.Done():
|
|
case <-time.After(time.Minute):
|
|
require.FailNow(t, "timed out waiting for client to be sent metrics")
|
|
}
|
|
}
|
|
m.Stop()
|
|
|
|
require.Eventually(t, func() bool {
|
|
// Force m.lastSendTimestamp to be current so the last send timestamp isn't
|
|
// the reason resharding is disabled.
|
|
m.lastSendTimestamp.Store(time.Now().Unix())
|
|
return m.shouldReshard(m.numShards+1) == false
|
|
}, time.Minute, 10*time.Millisecond, "shouldReshard was never disabled")
|
|
|
|
// After 2x retryAfter, resharding should be enabled again.
|
|
require.Eventually(t, func() bool {
|
|
// Force m.lastSendTimestamp to be current so the last send timestamp isn't
|
|
// the reason resharding is disabled.
|
|
m.lastSendTimestamp.Store(time.Now().Unix())
|
|
return m.shouldReshard(m.numShards+1) == true
|
|
}, time.Minute, retryAfter, "shouldReshard should have been re-enabled")
|
|
}
|
|
|
|
func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) {
|
|
samples := make([]record.RefSample, 0, numSamples)
|
|
series := make([]record.RefSeries, 0, numSeries)
|
|
lb := labels.NewScratchBuilder(1 + len(extraLabels))
|
|
for i := 0; i < numSeries; i++ {
|
|
name := fmt.Sprintf("test_metric_%d", i)
|
|
for j := 0; j < numSamples; j++ {
|
|
samples = append(samples, record.RefSample{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
T: int64(j),
|
|
V: float64(i),
|
|
})
|
|
}
|
|
// Create Labels that is name of series plus any extra labels supplied.
|
|
lb.Reset()
|
|
lb.Add(labels.MetricName, name)
|
|
rand.Shuffle(len(extraLabels), func(i, j int) {
|
|
extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i]
|
|
})
|
|
for _, l := range extraLabels {
|
|
lb.Add(l.Name, l.Value)
|
|
}
|
|
lb.Sort()
|
|
series = append(series, record.RefSeries{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
Labels: lb.Labels(),
|
|
})
|
|
}
|
|
return samples, series
|
|
}
|
|
|
|
func createProtoTimeseriesWithOld(numSamples, baseTs int64, extraLabels ...labels.Label) []prompb.TimeSeries {
|
|
samples := make([]prompb.TimeSeries, numSamples)
|
|
// use a fixed rand source so tests are consistent
|
|
r := rand.New(rand.NewSource(99))
|
|
for j := int64(0); j < numSamples; j++ {
|
|
name := fmt.Sprintf("test_metric_%d", j)
|
|
|
|
samples[j] = prompb.TimeSeries{
|
|
Labels: []prompb.Label{{Name: "__name__", Value: name}},
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: baseTs + j,
|
|
Value: float64(j),
|
|
},
|
|
},
|
|
}
|
|
// 10% of the time use a ts that is too old
|
|
if r.Intn(10) == 0 {
|
|
samples[j].Samples[0].Timestamp = baseTs - 5
|
|
}
|
|
}
|
|
return samples
|
|
}
|
|
|
|
func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) {
|
|
exemplars := make([]record.RefExemplar, 0, numExemplars)
|
|
series := make([]record.RefSeries, 0, numSeries)
|
|
for i := 0; i < numSeries; i++ {
|
|
name := fmt.Sprintf("test_metric_%d", i)
|
|
for j := 0; j < numExemplars; j++ {
|
|
e := record.RefExemplar{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
T: int64(j),
|
|
V: float64(i),
|
|
Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)),
|
|
}
|
|
exemplars = append(exemplars, e)
|
|
}
|
|
series = append(series, record.RefSeries{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
Labels: labels.FromStrings("__name__", name),
|
|
})
|
|
}
|
|
return exemplars, series
|
|
}
|
|
|
|
func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record.RefHistogramSample, []record.RefFloatHistogramSample, []record.RefSeries) {
|
|
histograms := make([]record.RefHistogramSample, 0, numSamples)
|
|
floatHistograms := make([]record.RefFloatHistogramSample, 0, numSamples)
|
|
series := make([]record.RefSeries, 0, numSeries)
|
|
for i := 0; i < numSeries; i++ {
|
|
name := fmt.Sprintf("test_metric_%d", i)
|
|
for j := 0; j < numSamples; j++ {
|
|
hist := &histogram.Histogram{
|
|
Schema: 2,
|
|
ZeroThreshold: 1e-128,
|
|
ZeroCount: 0,
|
|
Count: 2,
|
|
Sum: 0,
|
|
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
|
|
PositiveBuckets: []int64{int64(i) + 1},
|
|
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
|
|
NegativeBuckets: []int64{int64(-i) - 1},
|
|
}
|
|
|
|
if floatHistogram {
|
|
fh := record.RefFloatHistogramSample{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
T: int64(j),
|
|
FH: hist.ToFloat(nil),
|
|
}
|
|
floatHistograms = append(floatHistograms, fh)
|
|
} else {
|
|
h := record.RefHistogramSample{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
T: int64(j),
|
|
H: hist,
|
|
}
|
|
histograms = append(histograms, h)
|
|
}
|
|
}
|
|
series = append(series, record.RefSeries{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
Labels: labels.FromStrings("__name__", name),
|
|
})
|
|
}
|
|
if floatHistogram {
|
|
return nil, floatHistograms, series
|
|
}
|
|
return histograms, nil, series
|
|
}
|
|
|
|
func createSeriesMetadata(series []record.RefSeries) []record.RefMetadata {
|
|
metas := make([]record.RefMetadata, 0, len(series))
|
|
|
|
for _, s := range series {
|
|
metas = append(metas, record.RefMetadata{
|
|
Ref: s.Ref,
|
|
Type: uint8(record.Counter),
|
|
Unit: "unit text",
|
|
Help: "help text",
|
|
})
|
|
}
|
|
return metas
|
|
}
|
|
|
|
func getSeriesIDFromRef(r record.RefSeries) string {
|
|
return r.Labels.String()
|
|
}
|
|
|
|
// TestWriteClient represents write client which does not call remote storage,
|
|
// but instead re-implements fake WriteHandler for test purposes.
|
|
type TestWriteClient struct {
|
|
receivedSamples map[string][]prompb.Sample
|
|
expectedSamples map[string][]prompb.Sample
|
|
receivedExemplars map[string][]prompb.Exemplar
|
|
expectedExemplars map[string][]prompb.Exemplar
|
|
receivedHistograms map[string][]prompb.Histogram
|
|
receivedFloatHistograms map[string][]prompb.Histogram
|
|
expectedHistograms map[string][]prompb.Histogram
|
|
expectedFloatHistograms map[string][]prompb.Histogram
|
|
receivedMetadata map[string][]prompb.MetricMetadata
|
|
writesReceived int
|
|
mtx sync.Mutex
|
|
buf []byte
|
|
protoMsg config.RemoteWriteProtoMsg
|
|
injectedErrs []error
|
|
currErr int
|
|
retry bool
|
|
|
|
storeWait time.Duration
|
|
// TODO(npazosmendez): maybe replaceable with injectedErrs?
|
|
returnError error
|
|
}
|
|
|
|
// NewTestWriteClient creates a new testing write client.
|
|
func NewTestWriteClient(protoMsg config.RemoteWriteProtoMsg) *TestWriteClient {
|
|
return &TestWriteClient{
|
|
receivedSamples: map[string][]prompb.Sample{},
|
|
expectedSamples: map[string][]prompb.Sample{},
|
|
receivedMetadata: map[string][]prompb.MetricMetadata{},
|
|
protoMsg: protoMsg,
|
|
storeWait: 0,
|
|
returnError: nil,
|
|
}
|
|
}
|
|
|
|
func (c *TestWriteClient) injectErrors(injectedErrs []error) {
|
|
c.injectedErrs = injectedErrs
|
|
c.currErr = -1
|
|
c.retry = false
|
|
}
|
|
|
|
func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
c.expectedSamples = map[string][]prompb.Sample{}
|
|
c.receivedSamples = map[string][]prompb.Sample{}
|
|
|
|
for _, s := range ss {
|
|
tsID := getSeriesIDFromRef(series[s.Ref])
|
|
c.expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{
|
|
Timestamp: s.T,
|
|
Value: s.V,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []record.RefSeries) {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
c.expectedExemplars = map[string][]prompb.Exemplar{}
|
|
c.receivedExemplars = map[string][]prompb.Exemplar{}
|
|
|
|
for _, s := range ss {
|
|
tsID := getSeriesIDFromRef(series[s.Ref])
|
|
e := prompb.Exemplar{
|
|
Labels: prompb.FromLabels(s.Labels, nil),
|
|
Timestamp: s.T,
|
|
Value: s.V,
|
|
}
|
|
c.expectedExemplars[tsID] = append(c.expectedExemplars[tsID], e)
|
|
}
|
|
}
|
|
|
|
func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, series []record.RefSeries) {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
c.expectedHistograms = map[string][]prompb.Histogram{}
|
|
c.receivedHistograms = map[string][]prompb.Histogram{}
|
|
|
|
for _, h := range hh {
|
|
tsID := getSeriesIDFromRef(series[h.Ref])
|
|
c.expectedHistograms[tsID] = append(c.expectedHistograms[tsID], prompb.FromIntHistogram(h.T, h.H))
|
|
}
|
|
}
|
|
|
|
func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSample, series []record.RefSeries) {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
c.expectedFloatHistograms = map[string][]prompb.Histogram{}
|
|
c.receivedFloatHistograms = map[string][]prompb.Histogram{}
|
|
|
|
for _, fh := range fhs {
|
|
tsID := getSeriesIDFromRef(series[fh.Ref])
|
|
c.expectedFloatHistograms[tsID] = append(c.expectedFloatHistograms[tsID], prompb.FromFloatHistogram(fh.T, fh.FH))
|
|
}
|
|
}
|
|
|
|
func deepLen[M any](ms ...map[string][]M) int {
|
|
l := 0
|
|
for _, m := range ms {
|
|
for _, v := range m {
|
|
l += len(v)
|
|
}
|
|
}
|
|
return l
|
|
}
|
|
|
|
func (c *TestWriteClient) waitForExpectedData(tb testing.TB, timeout time.Duration) {
|
|
tb.Helper()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
if err := runutil.Retry(500*time.Millisecond, ctx.Done(), func() error {
|
|
c.mtx.Lock()
|
|
exp := deepLen(c.expectedSamples) + deepLen(c.expectedExemplars) + deepLen(c.expectedHistograms, c.expectedFloatHistograms)
|
|
got := deepLen(c.receivedSamples) + deepLen(c.receivedExemplars) + deepLen(c.receivedHistograms, c.receivedFloatHistograms)
|
|
c.mtx.Unlock()
|
|
|
|
if got < exp {
|
|
return fmt.Errorf("expected %v samples/exemplars/histograms/floathistograms, got %v", exp, got)
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
tb.Error(err)
|
|
}
|
|
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
for ts, expectedSamples := range c.expectedSamples {
|
|
require.Equal(tb, expectedSamples, c.receivedSamples[ts], ts)
|
|
}
|
|
for ts, expectedExemplar := range c.expectedExemplars {
|
|
require.Equal(tb, expectedExemplar, c.receivedExemplars[ts], ts)
|
|
}
|
|
for ts, expectedHistogram := range c.expectedHistograms {
|
|
require.Equal(tb, expectedHistogram, c.receivedHistograms[ts], ts)
|
|
}
|
|
for ts, expectedFloatHistogram := range c.expectedFloatHistograms {
|
|
require.Equal(tb, expectedFloatHistogram, c.receivedFloatHistograms[ts], ts)
|
|
}
|
|
}
|
|
|
|
func (c *TestWriteClient) SetStoreWait(w time.Duration) {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
c.storeWait = w
|
|
}
|
|
|
|
func (c *TestWriteClient) SetReturnError(err error) {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
c.returnError = err
|
|
}
|
|
|
|
func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) (WriteResponseStats, error) {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
if c.storeWait > 0 {
|
|
time.Sleep(c.storeWait)
|
|
}
|
|
if c.returnError != nil {
|
|
return WriteResponseStats{}, c.returnError
|
|
}
|
|
// nil buffers are ok for snappy, ignore cast error.
|
|
if c.buf != nil {
|
|
c.buf = c.buf[:cap(c.buf)]
|
|
}
|
|
|
|
reqBuf, err := snappy.Decode(c.buf, req)
|
|
c.buf = reqBuf
|
|
if err != nil {
|
|
return WriteResponseStats{}, err
|
|
}
|
|
|
|
// Check if we've been told to inject err for this call.
|
|
if len(c.injectedErrs) > 0 {
|
|
c.currErr++
|
|
if err = c.injectedErrs[c.currErr]; err != nil {
|
|
return WriteResponseStats{}, err
|
|
}
|
|
}
|
|
|
|
var reqProto *prompb.WriteRequest
|
|
switch c.protoMsg {
|
|
case config.RemoteWriteProtoMsgV1:
|
|
reqProto = &prompb.WriteRequest{}
|
|
err = proto.Unmarshal(reqBuf, reqProto)
|
|
case config.RemoteWriteProtoMsgV2:
|
|
// NOTE(bwplotka): v1 msg can be unmarshaled to v2 sometimes, without
|
|
// errors.
|
|
var reqProtoV2 writev2.Request
|
|
err = proto.Unmarshal(reqBuf, &reqProtoV2)
|
|
if err == nil {
|
|
reqProto, err = v2RequestToWriteRequest(&reqProtoV2)
|
|
}
|
|
}
|
|
if err != nil {
|
|
return WriteResponseStats{}, err
|
|
}
|
|
|
|
rs := WriteResponseStats{}
|
|
b := labels.NewScratchBuilder(0)
|
|
for _, ts := range reqProto.Timeseries {
|
|
labels := ts.ToLabels(&b, nil)
|
|
tsID := labels.String()
|
|
if len(ts.Samples) > 0 {
|
|
c.receivedSamples[tsID] = append(c.receivedSamples[tsID], ts.Samples...)
|
|
}
|
|
rs.Samples += len(ts.Samples)
|
|
|
|
if len(ts.Exemplars) > 0 {
|
|
c.receivedExemplars[tsID] = append(c.receivedExemplars[tsID], ts.Exemplars...)
|
|
}
|
|
rs.Exemplars += len(ts.Exemplars)
|
|
|
|
for _, h := range ts.Histograms {
|
|
if h.IsFloatHistogram() {
|
|
c.receivedFloatHistograms[tsID] = append(c.receivedFloatHistograms[tsID], h)
|
|
} else {
|
|
c.receivedHistograms[tsID] = append(c.receivedHistograms[tsID], h)
|
|
}
|
|
}
|
|
rs.Histograms += len(ts.Histograms)
|
|
}
|
|
for _, m := range reqProto.Metadata {
|
|
c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m)
|
|
}
|
|
|
|
c.writesReceived++
|
|
return rs, nil
|
|
}
|
|
|
|
func (c *TestWriteClient) Name() string {
|
|
return "testwriteclient"
|
|
}
|
|
|
|
func (c *TestWriteClient) Endpoint() string {
|
|
return "http://test-remote.com/1234"
|
|
}
|
|
|
|
func v2RequestToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, error) {
|
|
req := &prompb.WriteRequest{
|
|
Timeseries: make([]prompb.TimeSeries, len(v2Req.Timeseries)),
|
|
// TODO handle metadata?
|
|
}
|
|
b := labels.NewScratchBuilder(0)
|
|
for i, rts := range v2Req.Timeseries {
|
|
rts.ToLabels(&b, v2Req.Symbols).Range(func(l labels.Label) {
|
|
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{
|
|
Name: l.Name,
|
|
Value: l.Value,
|
|
})
|
|
})
|
|
|
|
exemplars := make([]prompb.Exemplar, len(rts.Exemplars))
|
|
for j, e := range rts.Exemplars {
|
|
exemplars[j].Value = e.Value
|
|
exemplars[j].Timestamp = e.Timestamp
|
|
e.ToExemplar(&b, v2Req.Symbols).Labels.Range(func(l labels.Label) {
|
|
exemplars[j].Labels = append(exemplars[j].Labels, prompb.Label{
|
|
Name: l.Name,
|
|
Value: l.Value,
|
|
})
|
|
})
|
|
}
|
|
req.Timeseries[i].Exemplars = exemplars
|
|
|
|
req.Timeseries[i].Samples = make([]prompb.Sample, len(rts.Samples))
|
|
for j, s := range rts.Samples {
|
|
req.Timeseries[i].Samples[j].Timestamp = s.Timestamp
|
|
req.Timeseries[i].Samples[j].Value = s.Value
|
|
}
|
|
|
|
req.Timeseries[i].Histograms = make([]prompb.Histogram, len(rts.Histograms))
|
|
for j, h := range rts.Histograms {
|
|
if h.IsFloatHistogram() {
|
|
req.Timeseries[i].Histograms[j] = prompb.FromFloatHistogram(h.Timestamp, h.ToFloatHistogram())
|
|
continue
|
|
}
|
|
req.Timeseries[i].Histograms[j] = prompb.FromIntHistogram(h.Timestamp, h.ToIntHistogram())
|
|
}
|
|
}
|
|
return req, nil
|
|
}
|
|
|
|
// TestBlockingWriteClient is a queue_manager WriteClient which will block
|
|
// on any calls to Store(), until the request's Context is cancelled, at which
|
|
// point the `numCalls` property will contain a count of how many times Store()
|
|
// was called.
|
|
type TestBlockingWriteClient struct {
|
|
numCalls atomic.Uint64
|
|
}
|
|
|
|
func NewTestBlockedWriteClient() *TestBlockingWriteClient {
|
|
return &TestBlockingWriteClient{}
|
|
}
|
|
|
|
func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte, _ int) (WriteResponseStats, error) {
|
|
c.numCalls.Inc()
|
|
<-ctx.Done()
|
|
return WriteResponseStats{}, nil
|
|
}
|
|
|
|
func (c *TestBlockingWriteClient) NumCalls() uint64 {
|
|
return c.numCalls.Load()
|
|
}
|
|
|
|
func (c *TestBlockingWriteClient) Name() string {
|
|
return "testblockingwriteclient"
|
|
}
|
|
|
|
func (c *TestBlockingWriteClient) Endpoint() string {
|
|
return "http://test-remote-blocking.com/1234"
|
|
}
|
|
|
|
// For benchmarking the send and not the receive side.
|
|
type NopWriteClient struct{}
|
|
|
|
func NewNopWriteClient() *NopWriteClient { return &NopWriteClient{} }
|
|
func (c *NopWriteClient) Store(context.Context, []byte, int) (WriteResponseStats, error) {
|
|
return WriteResponseStats{}, nil
|
|
}
|
|
func (c *NopWriteClient) Name() string { return "nopwriteclient" }
|
|
func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" }
|
|
|
|
type MockWriteClient struct {
|
|
StoreFunc func(context.Context, []byte, int) (WriteResponseStats, error)
|
|
NameFunc func() string
|
|
EndpointFunc func() string
|
|
}
|
|
|
|
func (c *MockWriteClient) Store(ctx context.Context, bb []byte, n int) (WriteResponseStats, error) {
|
|
return c.StoreFunc(ctx, bb, n)
|
|
}
|
|
func (c *MockWriteClient) Name() string { return c.NameFunc() }
|
|
func (c *MockWriteClient) Endpoint() string { return c.EndpointFunc() }
|
|
|
|
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics.
|
|
var extraLabels []labels.Label = []labels.Label{
|
|
{Name: "kubernetes_io_arch", Value: "amd64"},
|
|
{Name: "kubernetes_io_instance_type", Value: "c3.somesize"},
|
|
{Name: "kubernetes_io_os", Value: "linux"},
|
|
{Name: "container_name", Value: "some-name"},
|
|
{Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"},
|
|
{Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"},
|
|
{Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"},
|
|
{Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"},
|
|
{Name: "instance", Value: "ip-111-11-1-11.ec2.internal"},
|
|
{Name: "job", Value: "kubernetes-cadvisor"},
|
|
{Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"},
|
|
{Name: "monitor", Value: "prod"},
|
|
{Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"},
|
|
{Name: "namespace", Value: "kube-system"},
|
|
{Name: "pod_name", Value: "some-other-name-5j8s8"},
|
|
}
|
|
|
|
func BenchmarkSampleSend(b *testing.B) {
|
|
// Send one sample per series, which is the typical remote_write case
|
|
const numSamples = 1
|
|
const numSeries = 10000
|
|
|
|
samples, series := createTimeseries(numSamples, numSeries, extraLabels...)
|
|
|
|
c := NewNopWriteClient()
|
|
|
|
cfg := testDefaultQueueConfig()
|
|
mcfg := config.DefaultMetadataConfig
|
|
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
|
|
cfg.MinShards = 20
|
|
cfg.MaxShards = 20
|
|
|
|
// todo: test with new proto type(s)
|
|
for _, format := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
|
|
b.Run(string(format), func(b *testing.B) {
|
|
m := newTestQueueManager(b, cfg, mcfg, defaultFlushDeadline, c, format)
|
|
m.StoreSeries(series, 0)
|
|
|
|
// These should be received by the client.
|
|
m.Start()
|
|
defer m.Stop()
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
m.Append(samples)
|
|
m.UpdateSeriesSegment(series, i+1) // simulate what wlog.Watcher.garbageCollectSeries does
|
|
m.SeriesReset(i + 1)
|
|
}
|
|
// Do not include shutdown
|
|
b.StopTimer()
|
|
})
|
|
}
|
|
}
|
|
|
|
// Check how long it takes to add N series, including external labels processing.
|
|
func BenchmarkStoreSeries(b *testing.B) {
|
|
externalLabels := []labels.Label{
|
|
{Name: "cluster", Value: "mycluster"},
|
|
{Name: "replica", Value: "1"},
|
|
}
|
|
relabelConfigs := []*relabel.Config{{
|
|
SourceLabels: model.LabelNames{"namespace"},
|
|
Separator: ";",
|
|
Regex: relabel.MustNewRegexp("kube.*"),
|
|
TargetLabel: "job",
|
|
Replacement: "$1",
|
|
Action: relabel.Replace,
|
|
}}
|
|
testCases := []struct {
|
|
name string
|
|
externalLabels []labels.Label
|
|
ts []prompb.TimeSeries
|
|
relabelConfigs []*relabel.Config
|
|
}{
|
|
{name: "plain"},
|
|
{name: "externalLabels", externalLabels: externalLabels},
|
|
{name: "relabel", relabelConfigs: relabelConfigs},
|
|
{
|
|
name: "externalLabels+relabel",
|
|
externalLabels: externalLabels,
|
|
relabelConfigs: relabelConfigs,
|
|
},
|
|
}
|
|
|
|
// numSeries chosen to be big enough that StoreSeries dominates creating a new queue manager.
|
|
const numSeries = 1000
|
|
_, series := createTimeseries(0, numSeries, extraLabels...)
|
|
|
|
for _, tc := range testCases {
|
|
b.Run(tc.name, func(b *testing.B) {
|
|
for i := 0; i < b.N; i++ {
|
|
c := NewTestWriteClient(config.RemoteWriteProtoMsgV1)
|
|
dir := b.TempDir()
|
|
cfg := config.DefaultQueueConfig
|
|
mcfg := config.DefaultMetadataConfig
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
|
m.externalLabels = tc.externalLabels
|
|
m.relabelConfigs = tc.relabelConfigs
|
|
|
|
m.StoreSeries(series, 0)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func BenchmarkStartup(b *testing.B) {
|
|
dir := os.Getenv("WALDIR")
|
|
if dir == "" {
|
|
b.Skip("WALDIR env var not set")
|
|
}
|
|
|
|
// Find the second largest segment; we will replay up to this.
|
|
// (Second largest as WALWatcher will start tailing the largest).
|
|
dirents, err := os.ReadDir(dir)
|
|
require.NoError(b, err)
|
|
|
|
var segments []int
|
|
for _, dirent := range dirents {
|
|
if i, err := strconv.Atoi(dirent.Name()); err != nil {
|
|
segments = append(segments, i)
|
|
}
|
|
}
|
|
sort.Ints(segments)
|
|
|
|
logger := promslog.New(&promslog.Config{})
|
|
|
|
cfg := testDefaultQueueConfig()
|
|
mcfg := config.DefaultMetadataConfig
|
|
for n := 0; n < b.N; n++ {
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
c := NewTestBlockedWriteClient()
|
|
// todo: test with new proto type(s)
|
|
m := NewQueueManager(metrics, nil, nil, logger, dir,
|
|
newEWMARate(ewmaWeight, shardUpdateDuration),
|
|
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
|
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
|
|
m.watcher.MaxSegment = segments[len(segments)-2]
|
|
err := m.watcher.Run()
|
|
require.NoError(b, err)
|
|
}
|
|
}
|
|
|
|
func TestProcessExternalLabels(t *testing.T) {
|
|
b := labels.NewBuilder(labels.EmptyLabels())
|
|
for i, tc := range []struct {
|
|
labels labels.Labels
|
|
externalLabels []labels.Label
|
|
expected labels.Labels
|
|
}{
|
|
// Test adding labels at the end.
|
|
{
|
|
labels: labels.FromStrings("a", "b"),
|
|
externalLabels: []labels.Label{{Name: "c", Value: "d"}},
|
|
expected: labels.FromStrings("a", "b", "c", "d"),
|
|
},
|
|
|
|
// Test adding labels at the beginning.
|
|
{
|
|
labels: labels.FromStrings("c", "d"),
|
|
externalLabels: []labels.Label{{Name: "a", Value: "b"}},
|
|
expected: labels.FromStrings("a", "b", "c", "d"),
|
|
},
|
|
|
|
// Test we don't override existing labels.
|
|
{
|
|
labels: labels.FromStrings("a", "b"),
|
|
externalLabels: []labels.Label{{Name: "a", Value: "c"}},
|
|
expected: labels.FromStrings("a", "b"),
|
|
},
|
|
|
|
// Test empty externalLabels.
|
|
{
|
|
labels: labels.FromStrings("a", "b"),
|
|
externalLabels: []labels.Label{},
|
|
expected: labels.FromStrings("a", "b"),
|
|
},
|
|
|
|
// Test empty labels.
|
|
{
|
|
labels: labels.EmptyLabels(),
|
|
externalLabels: []labels.Label{{Name: "a", Value: "b"}},
|
|
expected: labels.FromStrings("a", "b"),
|
|
},
|
|
|
|
// Test labels is longer than externalLabels.
|
|
{
|
|
labels: labels.FromStrings("a", "b", "c", "d"),
|
|
externalLabels: []labels.Label{{Name: "e", Value: "f"}},
|
|
expected: labels.FromStrings("a", "b", "c", "d", "e", "f"),
|
|
},
|
|
|
|
// Test externalLabels is longer than labels.
|
|
{
|
|
labels: labels.FromStrings("c", "d"),
|
|
externalLabels: []labels.Label{{Name: "a", Value: "b"}, {Name: "e", Value: "f"}},
|
|
expected: labels.FromStrings("a", "b", "c", "d", "e", "f"),
|
|
},
|
|
|
|
// Adding with and without clashing labels.
|
|
{
|
|
labels: labels.FromStrings("a", "b", "c", "d"),
|
|
externalLabels: []labels.Label{{Name: "a", Value: "xxx"}, {Name: "c", Value: "yyy"}, {Name: "e", Value: "f"}},
|
|
expected: labels.FromStrings("a", "b", "c", "d", "e", "f"),
|
|
},
|
|
} {
|
|
b.Reset(tc.labels)
|
|
processExternalLabels(b, tc.externalLabels)
|
|
testutil.RequireEqual(t, tc.expected, b.Labels(), "test %d", i)
|
|
}
|
|
}
|
|
|
|
func TestCalculateDesiredShards(t *testing.T) {
|
|
cfg := config.DefaultQueueConfig
|
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1)
|
|
samplesIn := m.dataIn
|
|
|
|
// Need to start the queue manager so the proper metrics are initialized.
|
|
// However we can stop it right away since we don't need to do any actual
|
|
// processing.
|
|
m.Start()
|
|
m.Stop()
|
|
|
|
inputRate := int64(50000)
|
|
var pendingSamples int64
|
|
|
|
// Two minute startup, no samples are sent.
|
|
startedAt := time.Now().Add(-2 * time.Minute)
|
|
|
|
// helper function for adding samples.
|
|
addSamples := func(s int64, ts time.Duration) {
|
|
pendingSamples += s
|
|
samplesIn.incr(s)
|
|
samplesIn.tick()
|
|
|
|
m.highestRecvTimestamp.Set(float64(startedAt.Add(ts).Unix()))
|
|
}
|
|
|
|
// helper function for sending samples.
|
|
sendSamples := func(s int64, ts time.Duration) {
|
|
pendingSamples -= s
|
|
m.dataOut.incr(s)
|
|
m.dataOutDuration.incr(int64(m.numShards) * int64(shardUpdateDuration))
|
|
|
|
// highest sent is how far back pending samples would be at our input rate.
|
|
highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second)
|
|
m.metrics.highestSentTimestamp.Set(float64(highestSent.Unix()))
|
|
|
|
m.lastSendTimestamp.Store(time.Now().Unix())
|
|
}
|
|
|
|
ts := time.Duration(0)
|
|
for ; ts < 120*time.Second; ts += shardUpdateDuration {
|
|
addSamples(inputRate*int64(shardUpdateDuration/time.Second), ts)
|
|
m.numShards = m.calculateDesiredShards()
|
|
require.Equal(t, 1, m.numShards)
|
|
}
|
|
|
|
// Assume 100ms per request, or 10 requests per second per shard.
|
|
// Shard calculation should never drop below barely keeping up.
|
|
minShards := int(inputRate) / cfg.MaxSamplesPerSend / 10
|
|
// This test should never go above 200 shards, that would be more resources than needed.
|
|
maxShards := 200
|
|
|
|
for ; ts < 15*time.Minute; ts += shardUpdateDuration {
|
|
sin := inputRate * int64(shardUpdateDuration/time.Second)
|
|
addSamples(sin, ts)
|
|
|
|
sout := int64(m.numShards*cfg.MaxSamplesPerSend) * int64(shardUpdateDuration/(100*time.Millisecond))
|
|
// You can't send samples that don't exist so cap at the number of pending samples.
|
|
if sout > pendingSamples {
|
|
sout = pendingSamples
|
|
}
|
|
sendSamples(sout, ts)
|
|
|
|
t.Log("desiredShards", m.numShards, "pendingSamples", pendingSamples)
|
|
m.numShards = m.calculateDesiredShards()
|
|
require.GreaterOrEqual(t, m.numShards, minShards, "Shards are too low. desiredShards=%d, minShards=%d, t_seconds=%d", m.numShards, minShards, ts/time.Second)
|
|
require.LessOrEqual(t, m.numShards, maxShards, "Shards are too high. desiredShards=%d, maxShards=%d, t_seconds=%d", m.numShards, maxShards, ts/time.Second)
|
|
}
|
|
require.Equal(t, int64(0), pendingSamples, "Remote write never caught up, there are still %d pending samples.", pendingSamples)
|
|
}
|
|
|
|
func TestCalculateDesiredShardsDetail(t *testing.T) {
|
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1)
|
|
samplesIn := m.dataIn
|
|
|
|
for _, tc := range []struct {
|
|
name string
|
|
prevShards int
|
|
dataIn int64 // Quantities normalised to seconds.
|
|
dataOut int64
|
|
dataDropped int64
|
|
dataOutDuration float64
|
|
backlog float64
|
|
expectedShards int
|
|
}{
|
|
{
|
|
name: "nothing in or out 1",
|
|
prevShards: 1,
|
|
expectedShards: 1, // Shards stays the same.
|
|
},
|
|
{
|
|
name: "nothing in or out 10",
|
|
prevShards: 10,
|
|
expectedShards: 10, // Shards stays the same.
|
|
},
|
|
{
|
|
name: "steady throughput",
|
|
prevShards: 1,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 1,
|
|
expectedShards: 1,
|
|
},
|
|
{
|
|
name: "scale down",
|
|
prevShards: 10,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 5,
|
|
expectedShards: 5,
|
|
},
|
|
{
|
|
name: "scale down constrained",
|
|
prevShards: 7,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 5,
|
|
expectedShards: 7,
|
|
},
|
|
{
|
|
name: "scale up",
|
|
prevShards: 1,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 10,
|
|
expectedShards: 10,
|
|
},
|
|
{
|
|
name: "scale up constrained",
|
|
prevShards: 8,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 10,
|
|
expectedShards: 8,
|
|
},
|
|
{
|
|
name: "backlogged 20s",
|
|
prevShards: 2,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 2,
|
|
backlog: 20,
|
|
expectedShards: 4,
|
|
},
|
|
{
|
|
name: "backlogged 90s",
|
|
prevShards: 4,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 4,
|
|
backlog: 90,
|
|
expectedShards: 22,
|
|
},
|
|
{
|
|
name: "backlog reduced",
|
|
prevShards: 22,
|
|
dataIn: 10,
|
|
dataOut: 20,
|
|
dataOutDuration: 4,
|
|
backlog: 10,
|
|
expectedShards: 3,
|
|
},
|
|
{
|
|
name: "backlog eliminated",
|
|
prevShards: 3,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 2,
|
|
backlog: 0,
|
|
expectedShards: 2, // Shard back down.
|
|
},
|
|
{
|
|
name: "slight slowdown",
|
|
prevShards: 1,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 1.2,
|
|
expectedShards: 2, // 1.2 is rounded up to 2.
|
|
},
|
|
{
|
|
name: "bigger slowdown",
|
|
prevShards: 1,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 1.4,
|
|
expectedShards: 2,
|
|
},
|
|
{
|
|
name: "speed up",
|
|
prevShards: 2,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 1.2,
|
|
backlog: 0,
|
|
expectedShards: 2, // No reaction - 1.2 is rounded up to 2.
|
|
},
|
|
{
|
|
name: "speed up more",
|
|
prevShards: 2,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 0.9,
|
|
backlog: 0,
|
|
expectedShards: 1,
|
|
},
|
|
{
|
|
name: "marginal decision A",
|
|
prevShards: 3,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 2.01,
|
|
backlog: 0,
|
|
expectedShards: 3, // 2.01 rounds up to 3.
|
|
},
|
|
{
|
|
name: "marginal decision B",
|
|
prevShards: 3,
|
|
dataIn: 10,
|
|
dataOut: 10,
|
|
dataOutDuration: 1.99,
|
|
backlog: 0,
|
|
expectedShards: 2, // 1.99 rounds up to 2.
|
|
},
|
|
} {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
m.numShards = tc.prevShards
|
|
forceEMWA(samplesIn, tc.dataIn*int64(shardUpdateDuration/time.Second))
|
|
samplesIn.tick()
|
|
forceEMWA(m.dataOut, tc.dataOut*int64(shardUpdateDuration/time.Second))
|
|
forceEMWA(m.dataDropped, tc.dataDropped*int64(shardUpdateDuration/time.Second))
|
|
forceEMWA(m.dataOutDuration, int64(tc.dataOutDuration*float64(shardUpdateDuration)))
|
|
m.highestRecvTimestamp.value = tc.backlog // Not Set() because it can only increase value.
|
|
|
|
require.Equal(t, tc.expectedShards, m.calculateDesiredShards())
|
|
})
|
|
}
|
|
}
|
|
|
|
func forceEMWA(r *ewmaRate, rate int64) {
|
|
r.init = false
|
|
r.newEvents.Store(rate)
|
|
}
|
|
|
|
func TestQueueManagerMetrics(t *testing.T) {
|
|
reg := prometheus.NewPedanticRegistry()
|
|
metrics := newQueueManagerMetrics(reg, "name", "http://localhost:1234")
|
|
|
|
// Make sure metrics pass linting.
|
|
problems, err := client_testutil.GatherAndLint(reg)
|
|
require.NoError(t, err)
|
|
require.Empty(t, problems, "Metric linting problems detected: %v", problems)
|
|
|
|
// Make sure all metrics were unregistered. A failure here means you need
|
|
// unregister a metric in `queueManagerMetrics.unregister()`.
|
|
metrics.unregister()
|
|
err = client_testutil.GatherAndCompare(reg, strings.NewReader(""))
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func TestQueue_FlushAndShutdownDoesNotDeadlock(t *testing.T) {
|
|
capacity := 100
|
|
batchSize := 10
|
|
queue := newQueue(batchSize, capacity)
|
|
for i := 0; i < capacity+batchSize; i++ {
|
|
queue.Append(timeSeries{})
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
go queue.FlushAndShutdown(done)
|
|
go func() {
|
|
// Give enough time for FlushAndShutdown to acquire the lock. queue.Batch()
|
|
// should not block forever even if the lock is acquired.
|
|
time.Sleep(10 * time.Millisecond)
|
|
queue.Batch()
|
|
close(done)
|
|
}()
|
|
select {
|
|
case <-done:
|
|
case <-time.After(2 * time.Second):
|
|
t.Error("Deadlock in FlushAndShutdown detected")
|
|
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
|
t.FailNow()
|
|
}
|
|
}
|
|
|
|
func createDummyTimeSeries(instances int) []timeSeries {
|
|
metrics := []labels.Labels{
|
|
labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0"),
|
|
labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.25"),
|
|
labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.5"),
|
|
labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "0.75"),
|
|
labels.FromStrings("__name__", "go_gc_duration_seconds", "quantile", "1"),
|
|
labels.FromStrings("__name__", "go_gc_duration_seconds_sum"),
|
|
labels.FromStrings("__name__", "go_gc_duration_seconds_count"),
|
|
labels.FromStrings("__name__", "go_memstats_alloc_bytes_total"),
|
|
labels.FromStrings("__name__", "go_memstats_frees_total"),
|
|
labels.FromStrings("__name__", "go_memstats_lookups_total"),
|
|
labels.FromStrings("__name__", "go_memstats_mallocs_total"),
|
|
labels.FromStrings("__name__", "go_goroutines"),
|
|
labels.FromStrings("__name__", "go_info", "version", "go1.19.3"),
|
|
labels.FromStrings("__name__", "go_memstats_alloc_bytes"),
|
|
labels.FromStrings("__name__", "go_memstats_buck_hash_sys_bytes"),
|
|
labels.FromStrings("__name__", "go_memstats_gc_sys_bytes"),
|
|
labels.FromStrings("__name__", "go_memstats_heap_alloc_bytes"),
|
|
labels.FromStrings("__name__", "go_memstats_heap_idle_bytes"),
|
|
labels.FromStrings("__name__", "go_memstats_heap_inuse_bytes"),
|
|
labels.FromStrings("__name__", "go_memstats_heap_objects"),
|
|
labels.FromStrings("__name__", "go_memstats_heap_released_bytes"),
|
|
labels.FromStrings("__name__", "go_memstats_heap_sys_bytes"),
|
|
labels.FromStrings("__name__", "go_memstats_last_gc_time_seconds"),
|
|
labels.FromStrings("__name__", "go_memstats_mcache_inuse_bytes"),
|
|
labels.FromStrings("__name__", "go_memstats_mcache_sys_bytes"),
|
|
labels.FromStrings("__name__", "go_memstats_mspan_inuse_bytes"),
|
|
labels.FromStrings("__name__", "go_memstats_mspan_sys_bytes"),
|
|
labels.FromStrings("__name__", "go_memstats_next_gc_bytes"),
|
|
labels.FromStrings("__name__", "go_memstats_other_sys_bytes"),
|
|
labels.FromStrings("__name__", "go_memstats_stack_inuse_bytes"),
|
|
labels.FromStrings("__name__", "go_memstats_stack_sys_bytes"),
|
|
labels.FromStrings("__name__", "go_memstats_sys_bytes"),
|
|
labels.FromStrings("__name__", "go_threads"),
|
|
}
|
|
|
|
commonLabels := labels.FromStrings(
|
|
"cluster", "some-cluster-0",
|
|
"container", "prometheus",
|
|
"job", "some-namespace/prometheus",
|
|
"namespace", "some-namespace")
|
|
|
|
var result []timeSeries
|
|
r := rand.New(rand.NewSource(0))
|
|
for i := 0; i < instances; i++ {
|
|
b := labels.NewBuilder(commonLabels)
|
|
b.Set("pod", "prometheus-"+strconv.Itoa(i))
|
|
for _, lbls := range metrics {
|
|
lbls.Range(func(l labels.Label) {
|
|
b.Set(l.Name, l.Value)
|
|
})
|
|
result = append(result, timeSeries{
|
|
seriesLabels: b.Labels(),
|
|
value: r.Float64(),
|
|
})
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func BenchmarkBuildWriteRequest(b *testing.B) {
|
|
noopLogger := promslog.NewNopLogger()
|
|
bench := func(b *testing.B, batch []timeSeries) {
|
|
buff := make([]byte, 0)
|
|
seriesBuff := make([]prompb.TimeSeries, len(batch))
|
|
for i := range seriesBuff {
|
|
seriesBuff[i].Samples = []prompb.Sample{{}}
|
|
seriesBuff[i].Exemplars = []prompb.Exemplar{{}}
|
|
}
|
|
pBuf := proto.NewBuffer(nil)
|
|
|
|
totalSize := 0
|
|
for i := 0; i < b.N; i++ {
|
|
populateTimeSeries(batch, seriesBuff, true, true)
|
|
req, _, _, err := buildWriteRequest(noopLogger, seriesBuff, nil, pBuf, &buff, nil, "snappy")
|
|
if err != nil {
|
|
b.Fatal(err)
|
|
}
|
|
totalSize += len(req)
|
|
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
|
|
}
|
|
}
|
|
|
|
twoBatch := createDummyTimeSeries(2)
|
|
tenBatch := createDummyTimeSeries(10)
|
|
hundredBatch := createDummyTimeSeries(100)
|
|
|
|
b.Run("2 instances", func(b *testing.B) {
|
|
bench(b, twoBatch)
|
|
})
|
|
|
|
b.Run("10 instances", func(b *testing.B) {
|
|
bench(b, tenBatch)
|
|
})
|
|
|
|
b.Run("1k instances", func(b *testing.B) {
|
|
bench(b, hundredBatch)
|
|
})
|
|
}
|
|
|
|
func BenchmarkBuildV2WriteRequest(b *testing.B) {
|
|
noopLogger := promslog.NewNopLogger()
|
|
bench := func(b *testing.B, batch []timeSeries) {
|
|
symbolTable := writev2.NewSymbolTable()
|
|
buff := make([]byte, 0)
|
|
seriesBuff := make([]writev2.TimeSeries, len(batch))
|
|
for i := range seriesBuff {
|
|
seriesBuff[i].Samples = []writev2.Sample{{}}
|
|
seriesBuff[i].Exemplars = []writev2.Exemplar{{}}
|
|
}
|
|
pBuf := []byte{}
|
|
|
|
totalSize := 0
|
|
for i := 0; i < b.N; i++ {
|
|
populateV2TimeSeries(&symbolTable, batch, seriesBuff, true, true)
|
|
req, _, _, err := buildV2WriteRequest(noopLogger, seriesBuff, symbolTable.Symbols(), &pBuf, &buff, nil, "snappy")
|
|
if err != nil {
|
|
b.Fatal(err)
|
|
}
|
|
totalSize += len(req)
|
|
b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op")
|
|
}
|
|
}
|
|
|
|
twoBatch := createDummyTimeSeries(2)
|
|
tenBatch := createDummyTimeSeries(10)
|
|
hundredBatch := createDummyTimeSeries(100)
|
|
|
|
b.Run("2 instances", func(b *testing.B) {
|
|
bench(b, twoBatch)
|
|
})
|
|
|
|
b.Run("10 instances", func(b *testing.B) {
|
|
bench(b, tenBatch)
|
|
})
|
|
|
|
b.Run("1k instances", func(b *testing.B) {
|
|
bench(b, hundredBatch)
|
|
})
|
|
}
|
|
|
|
func TestDropOldTimeSeries(t *testing.T) {
|
|
size := 10
|
|
nSeries := 6
|
|
nSamples := config.DefaultQueueConfig.Capacity * size
|
|
samples, newSamples, series := createTimeseriesWithOldSamples(nSamples, nSeries)
|
|
|
|
// TODO(alexg): test with new version
|
|
c := NewTestWriteClient(config.RemoteWriteProtoMsgV1)
|
|
c.expectSamples(newSamples, series)
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
mcfg := config.DefaultMetadataConfig
|
|
cfg.MaxShards = 1
|
|
cfg.SampleAgeLimit = model.Duration(60 * time.Second)
|
|
m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, config.RemoteWriteProtoMsgV1)
|
|
m.StoreSeries(series, 0)
|
|
|
|
m.Start()
|
|
defer m.Stop()
|
|
|
|
m.Append(samples)
|
|
c.waitForExpectedData(t, 30*time.Second)
|
|
}
|
|
|
|
func TestIsSampleOld(t *testing.T) {
|
|
currentTime := time.Now()
|
|
require.True(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-61*time.Second))))
|
|
require.False(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-59*time.Second))))
|
|
}
|
|
|
|
// Simulates scenario in which remote write endpoint is down and a subset of samples is dropped due to age limit while backoffing.
|
|
func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) {
|
|
maxSamplesPerSend := 10
|
|
sampleAgeLimit := time.Second
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
cfg.MaxShards = 1
|
|
cfg.SampleAgeLimit = model.Duration(sampleAgeLimit)
|
|
// Set the batch send deadline to 5 minutes to effectively disable it.
|
|
cfg.BatchSendDeadline = model.Duration(time.Minute * 5)
|
|
cfg.Capacity = 10 * maxSamplesPerSend // more than the amount of data we append in the test
|
|
cfg.MaxBackoff = model.Duration(time.Millisecond * 100)
|
|
cfg.MinBackoff = model.Duration(time.Millisecond * 100)
|
|
cfg.MaxSamplesPerSend = maxSamplesPerSend
|
|
metadataCfg := config.DefaultMetadataConfig
|
|
metadataCfg.Send = true
|
|
metadataCfg.SendInterval = model.Duration(time.Second * 60)
|
|
metadataCfg.MaxSamplesPerSend = maxSamplesPerSend
|
|
c := NewTestWriteClient(config.RemoteWriteProtoMsgV1)
|
|
m := newTestQueueManager(t, cfg, metadataCfg, time.Second, c, config.RemoteWriteProtoMsgV1)
|
|
|
|
m.Start()
|
|
|
|
batchID := 0
|
|
expectedSamples := map[string][]prompb.Sample{}
|
|
|
|
appendData := func(numberOfSeries int, timeAdd time.Duration, shouldBeDropped bool) {
|
|
t.Log(">>>> Appending series ", numberOfSeries, " as batch ID ", batchID, " with timeAdd ", timeAdd, " and should be dropped ", shouldBeDropped)
|
|
samples, series := createTimeseriesWithRandomLabelCount(strconv.Itoa(batchID), numberOfSeries, timeAdd, 9)
|
|
m.StoreSeries(series, batchID)
|
|
sent := m.Append(samples)
|
|
require.True(t, sent, "samples not sent")
|
|
if !shouldBeDropped {
|
|
for _, s := range samples {
|
|
tsID := getSeriesIDFromRef(series[s.Ref])
|
|
expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{
|
|
Timestamp: s.T,
|
|
Value: s.V,
|
|
})
|
|
}
|
|
}
|
|
batchID++
|
|
}
|
|
timeShift := -time.Millisecond * 5
|
|
|
|
c.SetReturnError(RecoverableError{context.DeadlineExceeded, defaultBackoff})
|
|
|
|
appendData(maxSamplesPerSend/2, timeShift, true)
|
|
time.Sleep(sampleAgeLimit)
|
|
appendData(maxSamplesPerSend/2, timeShift, true)
|
|
time.Sleep(sampleAgeLimit / 10)
|
|
appendData(maxSamplesPerSend/2, timeShift, true)
|
|
time.Sleep(2 * sampleAgeLimit)
|
|
appendData(2*maxSamplesPerSend, timeShift, false)
|
|
time.Sleep(sampleAgeLimit / 2)
|
|
c.SetReturnError(nil)
|
|
appendData(5, timeShift, false)
|
|
m.Stop()
|
|
|
|
if diff := cmp.Diff(expectedSamples, c.receivedSamples); diff != "" {
|
|
t.Errorf("mismatch (-want +got):\n%s", diff)
|
|
}
|
|
}
|
|
|
|
func createTimeseriesWithRandomLabelCount(id string, seriesCount int, timeAdd time.Duration, maxLabels int) ([]record.RefSample, []record.RefSeries) {
|
|
samples := []record.RefSample{}
|
|
series := []record.RefSeries{}
|
|
// use a fixed rand source so tests are consistent
|
|
r := rand.New(rand.NewSource(99))
|
|
for i := 0; i < seriesCount; i++ {
|
|
s := record.RefSample{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
T: time.Now().Add(timeAdd).UnixMilli(),
|
|
V: r.Float64(),
|
|
}
|
|
samples = append(samples, s)
|
|
labelsCount := r.Intn(maxLabels)
|
|
lb := labels.NewScratchBuilder(1 + labelsCount)
|
|
lb.Add("__name__", "batch_"+id+"_id_"+strconv.Itoa(i))
|
|
for j := 1; j < labelsCount+1; j++ {
|
|
// same for both name and value
|
|
label := "batch_" + id + "_label_" + strconv.Itoa(j)
|
|
lb.Add(label, label)
|
|
}
|
|
series = append(series, record.RefSeries{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
Labels: lb.Labels(),
|
|
})
|
|
}
|
|
return samples, series
|
|
}
|
|
|
|
func createTimeseriesWithOldSamples(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSample, []record.RefSeries) {
|
|
newSamples := make([]record.RefSample, 0, numSamples)
|
|
samples := make([]record.RefSample, 0, numSamples)
|
|
series := make([]record.RefSeries, 0, numSeries)
|
|
lb := labels.NewScratchBuilder(1 + len(extraLabels))
|
|
for i := 0; i < numSeries; i++ {
|
|
name := fmt.Sprintf("test_metric_%d", i)
|
|
// We create half of the samples in the past.
|
|
past := timestamp.FromTime(time.Now().Add(-5 * time.Minute))
|
|
for j := 0; j < numSamples/2; j++ {
|
|
samples = append(samples, record.RefSample{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
T: past + int64(j),
|
|
V: float64(i),
|
|
})
|
|
}
|
|
for j := 0; j < numSamples/2; j++ {
|
|
sample := record.RefSample{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
T: int64(int(time.Now().UnixMilli()) + j),
|
|
V: float64(i),
|
|
}
|
|
samples = append(samples, sample)
|
|
newSamples = append(newSamples, sample)
|
|
}
|
|
// Create Labels that is name of series plus any extra labels supplied.
|
|
lb.Reset()
|
|
lb.Add(labels.MetricName, name)
|
|
for _, l := range extraLabels {
|
|
lb.Add(l.Name, l.Value)
|
|
}
|
|
lb.Sort()
|
|
series = append(series, record.RefSeries{
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
Labels: lb.Labels(),
|
|
})
|
|
}
|
|
return samples, newSamples, series
|
|
}
|
|
|
|
func filterTsLimit(limit int64, ts prompb.TimeSeries) bool {
|
|
return limit > ts.Samples[0].Timestamp
|
|
}
|
|
|
|
func TestBuildTimeSeries(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
ts []prompb.TimeSeries
|
|
filter func(ts prompb.TimeSeries) bool
|
|
lowestTs int64
|
|
highestTs int64
|
|
droppedSamples int
|
|
responseLen int
|
|
}{
|
|
{
|
|
name: "No filter applied",
|
|
ts: []prompb.TimeSeries{
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567890,
|
|
Value: 1.23,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567891,
|
|
Value: 2.34,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567892,
|
|
Value: 3.34,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
filter: nil,
|
|
responseLen: 3,
|
|
lowestTs: 1234567890,
|
|
highestTs: 1234567892,
|
|
},
|
|
{
|
|
name: "Filter applied, samples in order",
|
|
ts: []prompb.TimeSeries{
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567890,
|
|
Value: 1.23,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567891,
|
|
Value: 2.34,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567892,
|
|
Value: 3.45,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567893,
|
|
Value: 3.45,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
|
|
responseLen: 2,
|
|
lowestTs: 1234567892,
|
|
highestTs: 1234567893,
|
|
droppedSamples: 2,
|
|
},
|
|
{
|
|
name: "Filter applied, samples out of order",
|
|
ts: []prompb.TimeSeries{
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567892,
|
|
Value: 3.45,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567890,
|
|
Value: 1.23,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567893,
|
|
Value: 3.45,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567891,
|
|
Value: 2.34,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
|
|
responseLen: 2,
|
|
lowestTs: 1234567892,
|
|
highestTs: 1234567893,
|
|
droppedSamples: 2,
|
|
},
|
|
{
|
|
name: "Filter applied, samples not consecutive",
|
|
ts: []prompb.TimeSeries{
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567890,
|
|
Value: 1.23,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567892,
|
|
Value: 3.45,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567895,
|
|
Value: 6.78,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Samples: []prompb.Sample{
|
|
{
|
|
Timestamp: 1234567897,
|
|
Value: 6.78,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567895, ts) },
|
|
responseLen: 2,
|
|
lowestTs: 1234567895,
|
|
highestTs: 1234567897,
|
|
droppedSamples: 2,
|
|
},
|
|
}
|
|
|
|
// Run the test cases
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
highest, lowest, result, droppedSamples, _, _ := buildTimeSeries(tc.ts, tc.filter)
|
|
require.NotNil(t, result)
|
|
require.Len(t, result, tc.responseLen)
|
|
require.Equal(t, tc.highestTs, highest)
|
|
require.Equal(t, tc.lowestTs, lowest)
|
|
require.Equal(t, tc.droppedSamples, droppedSamples)
|
|
})
|
|
}
|
|
}
|
|
|
|
func BenchmarkBuildTimeSeries(b *testing.B) {
|
|
// Send one sample per series, which is the typical remote_write case
|
|
const numSamples = 10000
|
|
filter := func(ts prompb.TimeSeries) bool { return filterTsLimit(99, ts) }
|
|
for i := 0; i < b.N; i++ {
|
|
samples := createProtoTimeseriesWithOld(numSamples, 100, extraLabels...)
|
|
_, _, result, _, _, _ := buildTimeSeries(samples, filter)
|
|
require.NotNil(b, result)
|
|
}
|
|
}
|