mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1670 lines
48 KiB
1670 lines
48 KiB
// Copyright 2013 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package remote |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"math" |
|
"os" |
|
"runtime/pprof" |
|
"sort" |
|
"strconv" |
|
"strings" |
|
"sync" |
|
"testing" |
|
"time" |
|
|
|
"github.com/go-kit/log" |
|
"github.com/gogo/protobuf/proto" |
|
"github.com/golang/snappy" |
|
"github.com/prometheus/client_golang/prometheus" |
|
client_testutil "github.com/prometheus/client_golang/prometheus/testutil" |
|
"github.com/prometheus/common/model" |
|
"github.com/stretchr/testify/require" |
|
"go.uber.org/atomic" |
|
|
|
"github.com/prometheus/prometheus/config" |
|
"github.com/prometheus/prometheus/model/histogram" |
|
"github.com/prometheus/prometheus/model/labels" |
|
"github.com/prometheus/prometheus/model/relabel" |
|
"github.com/prometheus/prometheus/model/timestamp" |
|
"github.com/prometheus/prometheus/prompb" |
|
"github.com/prometheus/prometheus/scrape" |
|
"github.com/prometheus/prometheus/tsdb/chunks" |
|
"github.com/prometheus/prometheus/tsdb/record" |
|
"github.com/prometheus/prometheus/util/testutil" |
|
) |
|
|
|
const defaultFlushDeadline = 1 * time.Minute |
|
|
|
func newHighestTimestampMetric() *maxTimestamp { |
|
return &maxTimestamp{ |
|
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ |
|
Namespace: namespace, |
|
Subsystem: subsystem, |
|
Name: "highest_timestamp_in_seconds", |
|
Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.", |
|
}), |
|
} |
|
} |
|
|
|
func TestSampleDelivery(t *testing.T) { |
|
testcases := []struct { |
|
name string |
|
samples bool |
|
exemplars bool |
|
histograms bool |
|
floatHistograms bool |
|
}{ |
|
{samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"}, |
|
{samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"}, |
|
{samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"}, |
|
{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"}, |
|
{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"}, |
|
} |
|
|
|
// Let's create an even number of send batches so we don't run into the |
|
// batch timeout case. |
|
n := 3 |
|
|
|
dir := t.TempDir() |
|
|
|
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) |
|
defer s.Close() |
|
|
|
queueConfig := config.DefaultQueueConfig |
|
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond) |
|
queueConfig.MaxShards = 1 |
|
|
|
// We need to set URL's so that metric creation doesn't panic. |
|
writeConfig := baseRemoteWriteConfig("http://test-storage.com") |
|
writeConfig.QueueConfig = queueConfig |
|
writeConfig.SendExemplars = true |
|
writeConfig.SendNativeHistograms = true |
|
|
|
conf := &config.Config{ |
|
GlobalConfig: config.DefaultGlobalConfig, |
|
RemoteWriteConfigs: []*config.RemoteWriteConfig{ |
|
writeConfig, |
|
}, |
|
} |
|
|
|
for _, tc := range testcases { |
|
t.Run(tc.name, func(t *testing.T) { |
|
var ( |
|
series []record.RefSeries |
|
samples []record.RefSample |
|
exemplars []record.RefExemplar |
|
histograms []record.RefHistogramSample |
|
floatHistograms []record.RefFloatHistogramSample |
|
) |
|
|
|
// Generates same series in both cases. |
|
if tc.samples { |
|
samples, series = createTimeseries(n, n) |
|
} |
|
if tc.exemplars { |
|
exemplars, series = createExemplars(n, n) |
|
} |
|
if tc.histograms { |
|
histograms, _, series = createHistograms(n, n, false) |
|
} |
|
if tc.floatHistograms { |
|
_, floatHistograms, series = createHistograms(n, n, true) |
|
} |
|
|
|
// Apply new config. |
|
queueConfig.Capacity = len(samples) |
|
queueConfig.MaxSamplesPerSend = len(samples) / 2 |
|
require.NoError(t, s.ApplyConfig(conf)) |
|
hash, err := toHash(writeConfig) |
|
require.NoError(t, err) |
|
qm := s.rws.queues[hash] |
|
|
|
c := NewTestWriteClient() |
|
qm.SetClient(c) |
|
|
|
qm.StoreSeries(series, 0) |
|
|
|
// Send first half of data. |
|
c.expectSamples(samples[:len(samples)/2], series) |
|
c.expectExemplars(exemplars[:len(exemplars)/2], series) |
|
c.expectHistograms(histograms[:len(histograms)/2], series) |
|
c.expectFloatHistograms(floatHistograms[:len(floatHistograms)/2], series) |
|
qm.Append(samples[:len(samples)/2]) |
|
qm.AppendExemplars(exemplars[:len(exemplars)/2]) |
|
qm.AppendHistograms(histograms[:len(histograms)/2]) |
|
qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2]) |
|
c.waitForExpectedData(t) |
|
|
|
// Send second half of data. |
|
c.expectSamples(samples[len(samples)/2:], series) |
|
c.expectExemplars(exemplars[len(exemplars)/2:], series) |
|
c.expectHistograms(histograms[len(histograms)/2:], series) |
|
c.expectFloatHistograms(floatHistograms[len(floatHistograms)/2:], series) |
|
qm.Append(samples[len(samples)/2:]) |
|
qm.AppendExemplars(exemplars[len(exemplars)/2:]) |
|
qm.AppendHistograms(histograms[len(histograms)/2:]) |
|
qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:]) |
|
c.waitForExpectedData(t) |
|
}) |
|
} |
|
} |
|
|
|
func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration) (*TestWriteClient, *QueueManager) { |
|
c := NewTestWriteClient() |
|
cfg := config.DefaultQueueConfig |
|
mcfg := config.DefaultMetadataConfig |
|
return c, newTestQueueManager(t, cfg, mcfg, flushDeadline, c) |
|
} |
|
|
|
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient) *QueueManager { |
|
dir := t.TempDir() |
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
return m |
|
} |
|
|
|
func TestMetadataDelivery(t *testing.T) { |
|
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline) |
|
m.Start() |
|
defer m.Stop() |
|
|
|
metadata := []scrape.MetricMetadata{} |
|
numMetadata := 1532 |
|
for i := 0; i < numMetadata; i++ { |
|
metadata = append(metadata, scrape.MetricMetadata{ |
|
Metric: "prometheus_remote_storage_sent_metadata_bytes_total_" + strconv.Itoa(i), |
|
Type: model.MetricTypeCounter, |
|
Help: "a nice help text", |
|
Unit: "", |
|
}) |
|
} |
|
|
|
m.AppendMetadata(context.Background(), metadata) |
|
|
|
require.Len(t, c.receivedMetadata, numMetadata) |
|
// One more write than the rounded qoutient should be performed in order to get samples that didn't |
|
// fit into MaxSamplesPerSend. |
|
require.Equal(t, numMetadata/config.DefaultMetadataConfig.MaxSamplesPerSend+1, c.writesReceived) |
|
// Make sure the last samples were sent. |
|
require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric) |
|
} |
|
|
|
func TestSampleDeliveryTimeout(t *testing.T) { |
|
// Let's send one less sample than batch size, and wait the timeout duration |
|
n := 9 |
|
samples, series := createTimeseries(n, n) |
|
|
|
cfg := config.DefaultQueueConfig |
|
cfg.MaxShards = 1 |
|
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) |
|
|
|
c := NewTestWriteClient() |
|
m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c) |
|
m.StoreSeries(series, 0) |
|
m.Start() |
|
defer m.Stop() |
|
|
|
// Send the samples twice, waiting for the samples in the meantime. |
|
c.expectSamples(samples, series) |
|
m.Append(samples) |
|
c.waitForExpectedData(t) |
|
|
|
c.expectSamples(samples, series) |
|
m.Append(samples) |
|
c.waitForExpectedData(t) |
|
} |
|
|
|
func TestSampleDeliveryOrder(t *testing.T) { |
|
ts := 10 |
|
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts |
|
samples := make([]record.RefSample, 0, n) |
|
series := make([]record.RefSeries, 0, n) |
|
for i := 0; i < n; i++ { |
|
name := fmt.Sprintf("test_metric_%d", i%ts) |
|
samples = append(samples, record.RefSample{ |
|
Ref: chunks.HeadSeriesRef(i), |
|
T: int64(i), |
|
V: float64(i), |
|
}) |
|
series = append(series, record.RefSeries{ |
|
Ref: chunks.HeadSeriesRef(i), |
|
Labels: labels.FromStrings("__name__", name), |
|
}) |
|
} |
|
|
|
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline) |
|
c.expectSamples(samples, series) |
|
m.StoreSeries(series, 0) |
|
|
|
m.Start() |
|
defer m.Stop() |
|
// These should be received by the client. |
|
m.Append(samples) |
|
c.waitForExpectedData(t) |
|
} |
|
|
|
func TestShutdown(t *testing.T) { |
|
deadline := 1 * time.Second |
|
c := NewTestBlockedWriteClient() |
|
|
|
cfg := config.DefaultQueueConfig |
|
mcfg := config.DefaultMetadataConfig |
|
|
|
m := newTestQueueManager(t, cfg, mcfg, deadline, c) |
|
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend |
|
samples, series := createTimeseries(n, n) |
|
m.StoreSeries(series, 0) |
|
m.Start() |
|
|
|
// Append blocks to guarantee delivery, so we do it in the background. |
|
go func() { |
|
m.Append(samples) |
|
}() |
|
time.Sleep(100 * time.Millisecond) |
|
|
|
// Test to ensure that Stop doesn't block. |
|
start := time.Now() |
|
m.Stop() |
|
// The samples will never be delivered, so duration should |
|
// be at least equal to deadline, otherwise the flush deadline |
|
// was not respected. |
|
duration := time.Since(start) |
|
if duration > deadline+(deadline/10) { |
|
t.Errorf("Took too long to shutdown: %s > %s", duration, deadline) |
|
} |
|
if duration < deadline { |
|
t.Errorf("Shutdown occurred before flush deadline: %s < %s", duration, deadline) |
|
} |
|
} |
|
|
|
func TestSeriesReset(t *testing.T) { |
|
c := NewTestBlockedWriteClient() |
|
deadline := 5 * time.Second |
|
numSegments := 4 |
|
numSeries := 25 |
|
|
|
cfg := config.DefaultQueueConfig |
|
mcfg := config.DefaultMetadataConfig |
|
m := newTestQueueManager(t, cfg, mcfg, deadline, c) |
|
|
|
for i := 0; i < numSegments; i++ { |
|
series := []record.RefSeries{} |
|
for j := 0; j < numSeries; j++ { |
|
series = append(series, record.RefSeries{Ref: chunks.HeadSeriesRef((i * 100) + j), Labels: labels.FromStrings("a", "a")}) |
|
} |
|
m.StoreSeries(series, i) |
|
} |
|
require.Len(t, m.seriesLabels, numSegments*numSeries) |
|
m.SeriesReset(2) |
|
require.Len(t, m.seriesLabels, numSegments*numSeries/2) |
|
} |
|
|
|
func TestReshard(t *testing.T) { |
|
size := 10 // Make bigger to find more races. |
|
nSeries := 6 |
|
nSamples := config.DefaultQueueConfig.Capacity * size |
|
samples, series := createTimeseries(nSamples, nSeries) |
|
|
|
cfg := config.DefaultQueueConfig |
|
cfg.MaxShards = 1 |
|
|
|
c := NewTestWriteClient() |
|
m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c) |
|
c.expectSamples(samples, series) |
|
m.StoreSeries(series, 0) |
|
|
|
m.Start() |
|
defer m.Stop() |
|
|
|
go func() { |
|
for i := 0; i < len(samples); i += config.DefaultQueueConfig.Capacity { |
|
sent := m.Append(samples[i : i+config.DefaultQueueConfig.Capacity]) |
|
require.True(t, sent, "samples not sent") |
|
time.Sleep(100 * time.Millisecond) |
|
} |
|
}() |
|
|
|
for i := 1; i < len(samples)/config.DefaultQueueConfig.Capacity; i++ { |
|
m.shards.stop() |
|
m.shards.start(i) |
|
time.Sleep(100 * time.Millisecond) |
|
} |
|
|
|
c.waitForExpectedData(t) |
|
} |
|
|
|
func TestReshardRaceWithStop(t *testing.T) { |
|
c := NewTestWriteClient() |
|
var m *QueueManager |
|
h := sync.Mutex{} |
|
|
|
h.Lock() |
|
|
|
cfg := config.DefaultQueueConfig |
|
mcfg := config.DefaultMetadataConfig |
|
exitCh := make(chan struct{}) |
|
go func() { |
|
for { |
|
m = newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c) |
|
m.Start() |
|
h.Unlock() |
|
h.Lock() |
|
m.Stop() |
|
|
|
select { |
|
case exitCh <- struct{}{}: |
|
return |
|
default: |
|
} |
|
} |
|
}() |
|
|
|
for i := 1; i < 100; i++ { |
|
h.Lock() |
|
m.reshardChan <- i |
|
h.Unlock() |
|
} |
|
<-exitCh |
|
} |
|
|
|
func TestReshardPartialBatch(t *testing.T) { |
|
samples, series := createTimeseries(1, 10) |
|
|
|
c := NewTestBlockedWriteClient() |
|
|
|
cfg := config.DefaultQueueConfig |
|
mcfg := config.DefaultMetadataConfig |
|
cfg.MaxShards = 1 |
|
batchSendDeadline := time.Millisecond |
|
flushDeadline := 10 * time.Millisecond |
|
cfg.BatchSendDeadline = model.Duration(batchSendDeadline) |
|
|
|
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c) |
|
m.StoreSeries(series, 0) |
|
|
|
m.Start() |
|
|
|
for i := 0; i < 100; i++ { |
|
done := make(chan struct{}) |
|
go func() { |
|
m.Append(samples) |
|
time.Sleep(batchSendDeadline) |
|
m.shards.stop() |
|
m.shards.start(1) |
|
done <- struct{}{} |
|
}() |
|
select { |
|
case <-done: |
|
case <-time.After(2 * time.Second): |
|
t.Error("Deadlock between sending and stopping detected") |
|
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) |
|
t.FailNow() |
|
} |
|
} |
|
// We can only call stop if there was not a deadlock. |
|
m.Stop() |
|
} |
|
|
|
// TestQueueFilledDeadlock makes sure the code does not deadlock in the case |
|
// where a large scrape (> capacity + max samples per send) is appended at the |
|
// same time as a batch times out according to the batch send deadline. |
|
func TestQueueFilledDeadlock(t *testing.T) { |
|
samples, series := createTimeseries(50, 1) |
|
|
|
c := NewNopWriteClient() |
|
|
|
cfg := config.DefaultQueueConfig |
|
mcfg := config.DefaultMetadataConfig |
|
cfg.MaxShards = 1 |
|
cfg.MaxSamplesPerSend = 10 |
|
cfg.Capacity = 20 |
|
flushDeadline := time.Second |
|
batchSendDeadline := time.Millisecond |
|
cfg.BatchSendDeadline = model.Duration(batchSendDeadline) |
|
|
|
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c) |
|
m.StoreSeries(series, 0) |
|
m.Start() |
|
defer m.Stop() |
|
|
|
for i := 0; i < 100; i++ { |
|
done := make(chan struct{}) |
|
go func() { |
|
time.Sleep(batchSendDeadline) |
|
m.Append(samples) |
|
done <- struct{}{} |
|
}() |
|
select { |
|
case <-done: |
|
case <-time.After(2 * time.Second): |
|
t.Error("Deadlock between sending and appending detected") |
|
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) |
|
t.FailNow() |
|
} |
|
} |
|
} |
|
|
|
func TestReleaseNoninternedString(t *testing.T) { |
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline) |
|
m.Start() |
|
defer m.Stop() |
|
|
|
for i := 1; i < 1000; i++ { |
|
m.StoreSeries([]record.RefSeries{ |
|
{ |
|
Ref: chunks.HeadSeriesRef(i), |
|
Labels: labels.FromStrings("asdf", strconv.Itoa(i)), |
|
}, |
|
}, 0) |
|
m.SeriesReset(1) |
|
} |
|
|
|
metric := client_testutil.ToFloat64(noReferenceReleases) |
|
require.Equal(t, 0.0, metric, "expected there to be no calls to release for strings that were not already interned: %d", int(metric)) |
|
} |
|
|
|
func TestShouldReshard(t *testing.T) { |
|
type testcase struct { |
|
startingShards int |
|
samplesIn, samplesOut, lastSendTimestamp int64 |
|
expectedToReshard bool |
|
} |
|
cases := []testcase{ |
|
{ |
|
// Resharding shouldn't take place if the last successful send was > batch send deadline*2 seconds ago. |
|
startingShards: 10, |
|
samplesIn: 1000, |
|
samplesOut: 10, |
|
lastSendTimestamp: time.Now().Unix() - int64(3*time.Duration(config.DefaultQueueConfig.BatchSendDeadline)/time.Second), |
|
expectedToReshard: false, |
|
}, |
|
{ |
|
startingShards: 5, |
|
samplesIn: 1000, |
|
samplesOut: 10, |
|
lastSendTimestamp: time.Now().Unix(), |
|
expectedToReshard: true, |
|
}, |
|
} |
|
|
|
for _, c := range cases { |
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline) |
|
m.numShards = c.startingShards |
|
m.dataIn.incr(c.samplesIn) |
|
m.dataOut.incr(c.samplesOut) |
|
m.lastSendTimestamp.Store(c.lastSendTimestamp) |
|
|
|
m.Start() |
|
|
|
desiredShards := m.calculateDesiredShards() |
|
shouldReshard := m.shouldReshard(desiredShards) |
|
|
|
m.Stop() |
|
|
|
require.Equal(t, c.expectedToReshard, shouldReshard) |
|
} |
|
} |
|
|
|
// TestDisableReshardOnRetry asserts that resharding should be disabled when a |
|
// recoverable error is returned from remote_write. |
|
func TestDisableReshardOnRetry(t *testing.T) { |
|
onStoredContext, onStoreCalled := context.WithCancel(context.Background()) |
|
defer onStoreCalled() |
|
|
|
var ( |
|
fakeSamples, fakeSeries = createTimeseries(100, 100) |
|
|
|
cfg = config.DefaultQueueConfig |
|
mcfg = config.DefaultMetadataConfig |
|
retryAfter = time.Second |
|
|
|
metrics = newQueueManagerMetrics(nil, "", "") |
|
|
|
client = &MockWriteClient{ |
|
StoreFunc: func(ctx context.Context, b []byte, i int) error { |
|
onStoreCalled() |
|
|
|
return RecoverableError{ |
|
error: fmt.Errorf("fake error"), |
|
retryAfter: model.Duration(retryAfter), |
|
} |
|
}, |
|
NameFunc: func() string { return "mock" }, |
|
EndpointFunc: func() string { return "http://fake:9090/api/v1/write" }, |
|
} |
|
) |
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
m.StoreSeries(fakeSeries, 0) |
|
|
|
// Attempt to samples while the manager is running. We immediately stop the |
|
// manager after the recoverable error is generated to prevent the manager |
|
// from resharding itself. |
|
m.Start() |
|
{ |
|
m.Append(fakeSamples) |
|
|
|
select { |
|
case <-onStoredContext.Done(): |
|
case <-time.After(time.Minute): |
|
require.FailNow(t, "timed out waiting for client to be sent metrics") |
|
} |
|
} |
|
m.Stop() |
|
|
|
require.Eventually(t, func() bool { |
|
// Force m.lastSendTimestamp to be current so the last send timestamp isn't |
|
// the reason resharding is disabled. |
|
m.lastSendTimestamp.Store(time.Now().Unix()) |
|
return m.shouldReshard(m.numShards+1) == false |
|
}, time.Minute, 10*time.Millisecond, "shouldReshard was never disabled") |
|
|
|
// After 2x retryAfter, resharding should be enabled again. |
|
require.Eventually(t, func() bool { |
|
// Force m.lastSendTimestamp to be current so the last send timestamp isn't |
|
// the reason resharding is disabled. |
|
m.lastSendTimestamp.Store(time.Now().Unix()) |
|
return m.shouldReshard(m.numShards+1) == true |
|
}, time.Minute, retryAfter, "shouldReshard should have been re-enabled") |
|
} |
|
|
|
func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) { |
|
samples := make([]record.RefSample, 0, numSamples) |
|
series := make([]record.RefSeries, 0, numSeries) |
|
lb := labels.NewScratchBuilder(1 + len(extraLabels)) |
|
for i := 0; i < numSeries; i++ { |
|
name := fmt.Sprintf("test_metric_%d", i) |
|
for j := 0; j < numSamples; j++ { |
|
samples = append(samples, record.RefSample{ |
|
Ref: chunks.HeadSeriesRef(i), |
|
T: int64(j), |
|
V: float64(i), |
|
}) |
|
} |
|
// Create Labels that is name of series plus any extra labels supplied. |
|
lb.Reset() |
|
lb.Add(labels.MetricName, name) |
|
for _, l := range extraLabels { |
|
lb.Add(l.Name, l.Value) |
|
} |
|
lb.Sort() |
|
series = append(series, record.RefSeries{ |
|
Ref: chunks.HeadSeriesRef(i), |
|
Labels: lb.Labels(), |
|
}) |
|
} |
|
return samples, series |
|
} |
|
|
|
func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) { |
|
exemplars := make([]record.RefExemplar, 0, numExemplars) |
|
series := make([]record.RefSeries, 0, numSeries) |
|
for i := 0; i < numSeries; i++ { |
|
name := fmt.Sprintf("test_metric_%d", i) |
|
for j := 0; j < numExemplars; j++ { |
|
e := record.RefExemplar{ |
|
Ref: chunks.HeadSeriesRef(i), |
|
T: int64(j), |
|
V: float64(i), |
|
Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)), |
|
} |
|
exemplars = append(exemplars, e) |
|
} |
|
series = append(series, record.RefSeries{ |
|
Ref: chunks.HeadSeriesRef(i), |
|
Labels: labels.FromStrings("__name__", name), |
|
}) |
|
} |
|
return exemplars, series |
|
} |
|
|
|
func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record.RefHistogramSample, []record.RefFloatHistogramSample, []record.RefSeries) { |
|
histograms := make([]record.RefHistogramSample, 0, numSamples) |
|
floatHistograms := make([]record.RefFloatHistogramSample, 0, numSamples) |
|
series := make([]record.RefSeries, 0, numSeries) |
|
for i := 0; i < numSeries; i++ { |
|
name := fmt.Sprintf("test_metric_%d", i) |
|
for j := 0; j < numSamples; j++ { |
|
hist := &histogram.Histogram{ |
|
Schema: 2, |
|
ZeroThreshold: 1e-128, |
|
ZeroCount: 0, |
|
Count: 2, |
|
Sum: 0, |
|
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, |
|
PositiveBuckets: []int64{int64(i) + 1}, |
|
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, |
|
NegativeBuckets: []int64{int64(-i) - 1}, |
|
} |
|
|
|
if floatHistogram { |
|
fh := record.RefFloatHistogramSample{ |
|
Ref: chunks.HeadSeriesRef(i), |
|
T: int64(j), |
|
FH: hist.ToFloat(nil), |
|
} |
|
floatHistograms = append(floatHistograms, fh) |
|
} else { |
|
h := record.RefHistogramSample{ |
|
Ref: chunks.HeadSeriesRef(i), |
|
T: int64(j), |
|
H: hist, |
|
} |
|
histograms = append(histograms, h) |
|
} |
|
} |
|
series = append(series, record.RefSeries{ |
|
Ref: chunks.HeadSeriesRef(i), |
|
Labels: labels.FromStrings("__name__", name), |
|
}) |
|
} |
|
if floatHistogram { |
|
return nil, floatHistograms, series |
|
} |
|
return histograms, nil, series |
|
} |
|
|
|
func getSeriesNameFromRef(r record.RefSeries) string { |
|
return r.Labels.Get("__name__") |
|
} |
|
|
|
type TestWriteClient struct { |
|
receivedSamples map[string][]prompb.Sample |
|
expectedSamples map[string][]prompb.Sample |
|
receivedExemplars map[string][]prompb.Exemplar |
|
expectedExemplars map[string][]prompb.Exemplar |
|
receivedHistograms map[string][]prompb.Histogram |
|
receivedFloatHistograms map[string][]prompb.Histogram |
|
expectedHistograms map[string][]prompb.Histogram |
|
expectedFloatHistograms map[string][]prompb.Histogram |
|
receivedMetadata map[string][]prompb.MetricMetadata |
|
writesReceived int |
|
withWaitGroup bool |
|
wg sync.WaitGroup |
|
mtx sync.Mutex |
|
buf []byte |
|
} |
|
|
|
func NewTestWriteClient() *TestWriteClient { |
|
return &TestWriteClient{ |
|
withWaitGroup: true, |
|
receivedSamples: map[string][]prompb.Sample{}, |
|
expectedSamples: map[string][]prompb.Sample{}, |
|
receivedMetadata: map[string][]prompb.MetricMetadata{}, |
|
} |
|
} |
|
|
|
func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) { |
|
if !c.withWaitGroup { |
|
return |
|
} |
|
c.mtx.Lock() |
|
defer c.mtx.Unlock() |
|
|
|
c.expectedSamples = map[string][]prompb.Sample{} |
|
c.receivedSamples = map[string][]prompb.Sample{} |
|
|
|
for _, s := range ss { |
|
seriesName := getSeriesNameFromRef(series[s.Ref]) |
|
c.expectedSamples[seriesName] = append(c.expectedSamples[seriesName], prompb.Sample{ |
|
Timestamp: s.T, |
|
Value: s.V, |
|
}) |
|
} |
|
c.wg.Add(len(ss)) |
|
} |
|
|
|
func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []record.RefSeries) { |
|
if !c.withWaitGroup { |
|
return |
|
} |
|
c.mtx.Lock() |
|
defer c.mtx.Unlock() |
|
|
|
c.expectedExemplars = map[string][]prompb.Exemplar{} |
|
c.receivedExemplars = map[string][]prompb.Exemplar{} |
|
|
|
for _, s := range ss { |
|
seriesName := getSeriesNameFromRef(series[s.Ref]) |
|
e := prompb.Exemplar{ |
|
Labels: LabelsToLabelsProto(s.Labels, nil), |
|
Timestamp: s.T, |
|
Value: s.V, |
|
} |
|
c.expectedExemplars[seriesName] = append(c.expectedExemplars[seriesName], e) |
|
} |
|
c.wg.Add(len(ss)) |
|
} |
|
|
|
func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, series []record.RefSeries) { |
|
if !c.withWaitGroup { |
|
return |
|
} |
|
c.mtx.Lock() |
|
defer c.mtx.Unlock() |
|
|
|
c.expectedHistograms = map[string][]prompb.Histogram{} |
|
c.receivedHistograms = map[string][]prompb.Histogram{} |
|
|
|
for _, h := range hh { |
|
seriesName := getSeriesNameFromRef(series[h.Ref]) |
|
c.expectedHistograms[seriesName] = append(c.expectedHistograms[seriesName], HistogramToHistogramProto(h.T, h.H)) |
|
} |
|
c.wg.Add(len(hh)) |
|
} |
|
|
|
func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSample, series []record.RefSeries) { |
|
if !c.withWaitGroup { |
|
return |
|
} |
|
c.mtx.Lock() |
|
defer c.mtx.Unlock() |
|
|
|
c.expectedFloatHistograms = map[string][]prompb.Histogram{} |
|
c.receivedFloatHistograms = map[string][]prompb.Histogram{} |
|
|
|
for _, fh := range fhs { |
|
seriesName := getSeriesNameFromRef(series[fh.Ref]) |
|
c.expectedFloatHistograms[seriesName] = append(c.expectedFloatHistograms[seriesName], FloatHistogramToHistogramProto(fh.T, fh.FH)) |
|
} |
|
c.wg.Add(len(fhs)) |
|
} |
|
|
|
func (c *TestWriteClient) waitForExpectedData(tb testing.TB) { |
|
if !c.withWaitGroup { |
|
return |
|
} |
|
c.wg.Wait() |
|
c.mtx.Lock() |
|
defer c.mtx.Unlock() |
|
for ts, expectedSamples := range c.expectedSamples { |
|
require.Equal(tb, expectedSamples, c.receivedSamples[ts], ts) |
|
} |
|
for ts, expectedExemplar := range c.expectedExemplars { |
|
require.Equal(tb, expectedExemplar, c.receivedExemplars[ts], ts) |
|
} |
|
for ts, expectedHistogram := range c.expectedHistograms { |
|
require.Equal(tb, expectedHistogram, c.receivedHistograms[ts], ts) |
|
} |
|
for ts, expectedFloatHistogram := range c.expectedFloatHistograms { |
|
require.Equal(tb, expectedFloatHistogram, c.receivedFloatHistograms[ts], ts) |
|
} |
|
} |
|
|
|
func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { |
|
c.mtx.Lock() |
|
defer c.mtx.Unlock() |
|
// nil buffers are ok for snappy, ignore cast error. |
|
if c.buf != nil { |
|
c.buf = c.buf[:cap(c.buf)] |
|
} |
|
reqBuf, err := snappy.Decode(c.buf, req) |
|
c.buf = reqBuf |
|
if err != nil { |
|
return err |
|
} |
|
|
|
var reqProto prompb.WriteRequest |
|
if err := proto.Unmarshal(reqBuf, &reqProto); err != nil { |
|
return err |
|
} |
|
builder := labels.NewScratchBuilder(0) |
|
count := 0 |
|
for _, ts := range reqProto.Timeseries { |
|
labels := LabelProtosToLabels(&builder, ts.Labels) |
|
seriesName := labels.Get("__name__") |
|
for _, sample := range ts.Samples { |
|
count++ |
|
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample) |
|
} |
|
|
|
for _, ex := range ts.Exemplars { |
|
count++ |
|
c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex) |
|
} |
|
|
|
for _, histogram := range ts.Histograms { |
|
count++ |
|
if histogram.IsFloatHistogram() { |
|
c.receivedFloatHistograms[seriesName] = append(c.receivedFloatHistograms[seriesName], histogram) |
|
} else { |
|
c.receivedHistograms[seriesName] = append(c.receivedHistograms[seriesName], histogram) |
|
} |
|
} |
|
} |
|
if c.withWaitGroup { |
|
c.wg.Add(-count) |
|
} |
|
|
|
for _, m := range reqProto.Metadata { |
|
c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m) |
|
} |
|
|
|
c.writesReceived++ |
|
|
|
return nil |
|
} |
|
|
|
func (c *TestWriteClient) Name() string { |
|
return "testwriteclient" |
|
} |
|
|
|
func (c *TestWriteClient) Endpoint() string { |
|
return "http://test-remote.com/1234" |
|
} |
|
|
|
// TestBlockingWriteClient is a queue_manager WriteClient which will block |
|
// on any calls to Store(), until the request's Context is cancelled, at which |
|
// point the `numCalls` property will contain a count of how many times Store() |
|
// was called. |
|
type TestBlockingWriteClient struct { |
|
numCalls atomic.Uint64 |
|
} |
|
|
|
func NewTestBlockedWriteClient() *TestBlockingWriteClient { |
|
return &TestBlockingWriteClient{} |
|
} |
|
|
|
func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte, _ int) error { |
|
c.numCalls.Inc() |
|
<-ctx.Done() |
|
return nil |
|
} |
|
|
|
func (c *TestBlockingWriteClient) NumCalls() uint64 { |
|
return c.numCalls.Load() |
|
} |
|
|
|
func (c *TestBlockingWriteClient) Name() string { |
|
return "testblockingwriteclient" |
|
} |
|
|
|
func (c *TestBlockingWriteClient) Endpoint() string { |
|
return "http://test-remote-blocking.com/1234" |
|
} |
|
|
|
// For benchmarking the send and not the receive side. |
|
type NopWriteClient struct{} |
|
|
|
func NewNopWriteClient() *NopWriteClient { return &NopWriteClient{} } |
|
func (c *NopWriteClient) Store(context.Context, []byte, int) error { return nil } |
|
func (c *NopWriteClient) Name() string { return "nopwriteclient" } |
|
func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" } |
|
|
|
type MockWriteClient struct { |
|
StoreFunc func(context.Context, []byte, int) error |
|
NameFunc func() string |
|
EndpointFunc func() string |
|
} |
|
|
|
func (c *MockWriteClient) Store(ctx context.Context, bb []byte, n int) error { |
|
return c.StoreFunc(ctx, bb, n) |
|
} |
|
func (c *MockWriteClient) Name() string { return c.NameFunc() } |
|
func (c *MockWriteClient) Endpoint() string { return c.EndpointFunc() } |
|
|
|
// Extra labels to make a more realistic workload - taken from Kubernetes' embedded cAdvisor metrics. |
|
var extraLabels []labels.Label = []labels.Label{ |
|
{Name: "kubernetes_io_arch", Value: "amd64"}, |
|
{Name: "kubernetes_io_instance_type", Value: "c3.somesize"}, |
|
{Name: "kubernetes_io_os", Value: "linux"}, |
|
{Name: "container_name", Value: "some-name"}, |
|
{Name: "failure_domain_kubernetes_io_region", Value: "somewhere-1"}, |
|
{Name: "failure_domain_kubernetes_io_zone", Value: "somewhere-1b"}, |
|
{Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"}, |
|
{Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"}, |
|
{Name: "instance", Value: "ip-111-11-1-11.ec2.internal"}, |
|
{Name: "job", Value: "kubernetes-cadvisor"}, |
|
{Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"}, |
|
{Name: "monitor", Value: "prod"}, |
|
{Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"}, |
|
{Name: "namespace", Value: "kube-system"}, |
|
{Name: "pod_name", Value: "some-other-name-5j8s8"}, |
|
} |
|
|
|
func BenchmarkSampleSend(b *testing.B) { |
|
// Send one sample per series, which is the typical remote_write case |
|
const numSamples = 1 |
|
const numSeries = 10000 |
|
|
|
samples, series := createTimeseries(numSamples, numSeries, extraLabels...) |
|
|
|
c := NewNopWriteClient() |
|
|
|
cfg := config.DefaultQueueConfig |
|
mcfg := config.DefaultMetadataConfig |
|
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) |
|
cfg.MinShards = 20 |
|
cfg.MaxShards = 20 |
|
|
|
m := newTestQueueManager(b, cfg, mcfg, defaultFlushDeadline, c) |
|
m.StoreSeries(series, 0) |
|
|
|
// These should be received by the client. |
|
m.Start() |
|
defer m.Stop() |
|
|
|
b.ResetTimer() |
|
for i := 0; i < b.N; i++ { |
|
m.Append(samples) |
|
m.UpdateSeriesSegment(series, i+1) // simulate what wlog.Watcher.garbageCollectSeries does |
|
m.SeriesReset(i + 1) |
|
} |
|
// Do not include shutdown |
|
b.StopTimer() |
|
} |
|
|
|
// Check how long it takes to add N series, including external labels processing. |
|
func BenchmarkStoreSeries(b *testing.B) { |
|
externalLabels := []labels.Label{ |
|
{Name: "cluster", Value: "mycluster"}, |
|
{Name: "replica", Value: "1"}, |
|
} |
|
relabelConfigs := []*relabel.Config{{ |
|
SourceLabels: model.LabelNames{"namespace"}, |
|
Separator: ";", |
|
Regex: relabel.MustNewRegexp("kube.*"), |
|
TargetLabel: "job", |
|
Replacement: "$1", |
|
Action: relabel.Replace, |
|
}} |
|
testCases := []struct { |
|
name string |
|
externalLabels []labels.Label |
|
ts []prompb.TimeSeries |
|
relabelConfigs []*relabel.Config |
|
}{ |
|
{name: "plain"}, |
|
{name: "externalLabels", externalLabels: externalLabels}, |
|
{name: "relabel", relabelConfigs: relabelConfigs}, |
|
{ |
|
name: "externalLabels+relabel", |
|
externalLabels: externalLabels, |
|
relabelConfigs: relabelConfigs, |
|
}, |
|
} |
|
|
|
// numSeries chosen to be big enough that StoreSeries dominates creating a new queue manager. |
|
const numSeries = 1000 |
|
_, series := createTimeseries(0, numSeries, extraLabels...) |
|
|
|
for _, tc := range testCases { |
|
b.Run(tc.name, func(b *testing.B) { |
|
for i := 0; i < b.N; i++ { |
|
c := NewTestWriteClient() |
|
dir := b.TempDir() |
|
cfg := config.DefaultQueueConfig |
|
mcfg := config.DefaultMetadataConfig |
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
m.externalLabels = tc.externalLabels |
|
m.relabelConfigs = tc.relabelConfigs |
|
|
|
m.StoreSeries(series, 0) |
|
} |
|
}) |
|
} |
|
} |
|
|
|
func BenchmarkStartup(b *testing.B) { |
|
dir := os.Getenv("WALDIR") |
|
if dir == "" { |
|
b.Skip("WALDIR env var not set") |
|
} |
|
|
|
// Find the second largest segment; we will replay up to this. |
|
// (Second largest as WALWatcher will start tailing the largest). |
|
dirents, err := os.ReadDir(dir) |
|
require.NoError(b, err) |
|
|
|
var segments []int |
|
for _, dirent := range dirents { |
|
if i, err := strconv.Atoi(dirent.Name()); err != nil { |
|
segments = append(segments, i) |
|
} |
|
} |
|
sort.Ints(segments) |
|
|
|
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout)) |
|
logger = log.With(logger, "caller", log.DefaultCaller) |
|
|
|
cfg := config.DefaultQueueConfig |
|
mcfg := config.DefaultMetadataConfig |
|
for n := 0; n < b.N; n++ { |
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
c := NewTestBlockedWriteClient() |
|
m := NewQueueManager(metrics, nil, nil, logger, dir, |
|
newEWMARate(ewmaWeight, shardUpdateDuration), |
|
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) |
|
m.watcher.MaxSegment = segments[len(segments)-2] |
|
err := m.watcher.Run() |
|
require.NoError(b, err) |
|
} |
|
} |
|
|
|
func TestProcessExternalLabels(t *testing.T) { |
|
b := labels.NewBuilder(labels.EmptyLabels()) |
|
for i, tc := range []struct { |
|
labels labels.Labels |
|
externalLabels []labels.Label |
|
expected labels.Labels |
|
}{ |
|
// Test adding labels at the end. |
|
{ |
|
labels: labels.FromStrings("a", "b"), |
|
externalLabels: []labels.Label{{Name: "c", Value: "d"}}, |
|
expected: labels.FromStrings("a", "b", "c", "d"), |
|
}, |
|
|
|
// Test adding labels at the beginning. |
|
{ |
|
labels: labels.FromStrings("c", "d"), |
|
externalLabels: []labels.Label{{Name: "a", Value: "b"}}, |
|
expected: labels.FromStrings("a", "b", "c", "d"), |
|
}, |
|
|
|
// Test we don't override existing labels. |
|
{ |
|
labels: labels.FromStrings("a", "b"), |
|
externalLabels: []labels.Label{{Name: "a", Value: "c"}}, |
|
expected: labels.FromStrings("a", "b"), |
|
}, |
|
|
|
// Test empty externalLabels. |
|
{ |
|
labels: labels.FromStrings("a", "b"), |
|
externalLabels: []labels.Label{}, |
|
expected: labels.FromStrings("a", "b"), |
|
}, |
|
|
|
// Test empty labels. |
|
{ |
|
labels: labels.EmptyLabels(), |
|
externalLabels: []labels.Label{{Name: "a", Value: "b"}}, |
|
expected: labels.FromStrings("a", "b"), |
|
}, |
|
|
|
// Test labels is longer than externalLabels. |
|
{ |
|
labels: labels.FromStrings("a", "b", "c", "d"), |
|
externalLabels: []labels.Label{{Name: "e", Value: "f"}}, |
|
expected: labels.FromStrings("a", "b", "c", "d", "e", "f"), |
|
}, |
|
|
|
// Test externalLabels is longer than labels. |
|
{ |
|
labels: labels.FromStrings("c", "d"), |
|
externalLabels: []labels.Label{{Name: "a", Value: "b"}, {Name: "e", Value: "f"}}, |
|
expected: labels.FromStrings("a", "b", "c", "d", "e", "f"), |
|
}, |
|
|
|
// Adding with and without clashing labels. |
|
{ |
|
labels: labels.FromStrings("a", "b", "c", "d"), |
|
externalLabels: []labels.Label{{Name: "a", Value: "xxx"}, {Name: "c", Value: "yyy"}, {Name: "e", Value: "f"}}, |
|
expected: labels.FromStrings("a", "b", "c", "d", "e", "f"), |
|
}, |
|
} { |
|
b.Reset(tc.labels) |
|
processExternalLabels(b, tc.externalLabels) |
|
testutil.RequireEqual(t, tc.expected, b.Labels(), "test %d", i) |
|
} |
|
} |
|
|
|
func TestCalculateDesiredShards(t *testing.T) { |
|
cfg := config.DefaultQueueConfig |
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline) |
|
samplesIn := m.dataIn |
|
|
|
// Need to start the queue manager so the proper metrics are initialized. |
|
// However we can stop it right away since we don't need to do any actual |
|
// processing. |
|
m.Start() |
|
m.Stop() |
|
|
|
inputRate := int64(50000) |
|
var pendingSamples int64 |
|
|
|
// Two minute startup, no samples are sent. |
|
startedAt := time.Now().Add(-2 * time.Minute) |
|
|
|
// helper function for adding samples. |
|
addSamples := func(s int64, ts time.Duration) { |
|
pendingSamples += s |
|
samplesIn.incr(s) |
|
samplesIn.tick() |
|
|
|
m.highestRecvTimestamp.Set(float64(startedAt.Add(ts).Unix())) |
|
} |
|
|
|
// helper function for sending samples. |
|
sendSamples := func(s int64, ts time.Duration) { |
|
pendingSamples -= s |
|
m.dataOut.incr(s) |
|
m.dataOutDuration.incr(int64(m.numShards) * int64(shardUpdateDuration)) |
|
|
|
// highest sent is how far back pending samples would be at our input rate. |
|
highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second) |
|
m.metrics.highestSentTimestamp.Set(float64(highestSent.Unix())) |
|
|
|
m.lastSendTimestamp.Store(time.Now().Unix()) |
|
} |
|
|
|
ts := time.Duration(0) |
|
for ; ts < 120*time.Second; ts += shardUpdateDuration { |
|
addSamples(inputRate*int64(shardUpdateDuration/time.Second), ts) |
|
m.numShards = m.calculateDesiredShards() |
|
require.Equal(t, 1, m.numShards) |
|
} |
|
|
|
// Assume 100ms per request, or 10 requests per second per shard. |
|
// Shard calculation should never drop below barely keeping up. |
|
minShards := int(inputRate) / cfg.MaxSamplesPerSend / 10 |
|
// This test should never go above 200 shards, that would be more resources than needed. |
|
maxShards := 200 |
|
|
|
for ; ts < 15*time.Minute; ts += shardUpdateDuration { |
|
sin := inputRate * int64(shardUpdateDuration/time.Second) |
|
addSamples(sin, ts) |
|
|
|
sout := int64(m.numShards*cfg.MaxSamplesPerSend) * int64(shardUpdateDuration/(100*time.Millisecond)) |
|
// You can't send samples that don't exist so cap at the number of pending samples. |
|
if sout > pendingSamples { |
|
sout = pendingSamples |
|
} |
|
sendSamples(sout, ts) |
|
|
|
t.Log("desiredShards", m.numShards, "pendingSamples", pendingSamples) |
|
m.numShards = m.calculateDesiredShards() |
|
require.GreaterOrEqual(t, m.numShards, minShards, "Shards are too low. desiredShards=%d, minShards=%d, t_seconds=%d", m.numShards, minShards, ts/time.Second) |
|
require.LessOrEqual(t, m.numShards, maxShards, "Shards are too high. desiredShards=%d, maxShards=%d, t_seconds=%d", m.numShards, maxShards, ts/time.Second) |
|
} |
|
require.Equal(t, int64(0), pendingSamples, "Remote write never caught up, there are still %d pending samples.", pendingSamples) |
|
} |
|
|
|
func TestCalculateDesiredShardsDetail(t *testing.T) { |
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline) |
|
samplesIn := m.dataIn |
|
|
|
for _, tc := range []struct { |
|
name string |
|
prevShards int |
|
dataIn int64 // Quantities normalised to seconds. |
|
dataOut int64 |
|
dataDropped int64 |
|
dataOutDuration float64 |
|
backlog float64 |
|
expectedShards int |
|
}{ |
|
{ |
|
name: "nothing in or out 1", |
|
prevShards: 1, |
|
expectedShards: 1, // Shards stays the same. |
|
}, |
|
{ |
|
name: "nothing in or out 10", |
|
prevShards: 10, |
|
expectedShards: 10, // Shards stays the same. |
|
}, |
|
{ |
|
name: "steady throughput", |
|
prevShards: 1, |
|
dataIn: 10, |
|
dataOut: 10, |
|
dataOutDuration: 1, |
|
expectedShards: 1, |
|
}, |
|
{ |
|
name: "scale down", |
|
prevShards: 10, |
|
dataIn: 10, |
|
dataOut: 10, |
|
dataOutDuration: 5, |
|
expectedShards: 5, |
|
}, |
|
{ |
|
name: "scale down constrained", |
|
prevShards: 7, |
|
dataIn: 10, |
|
dataOut: 10, |
|
dataOutDuration: 5, |
|
expectedShards: 7, |
|
}, |
|
{ |
|
name: "scale up", |
|
prevShards: 1, |
|
dataIn: 10, |
|
dataOut: 10, |
|
dataOutDuration: 10, |
|
expectedShards: 10, |
|
}, |
|
{ |
|
name: "scale up constrained", |
|
prevShards: 8, |
|
dataIn: 10, |
|
dataOut: 10, |
|
dataOutDuration: 10, |
|
expectedShards: 8, |
|
}, |
|
{ |
|
name: "backlogged 20s", |
|
prevShards: 2, |
|
dataIn: 10, |
|
dataOut: 10, |
|
dataOutDuration: 2, |
|
backlog: 20, |
|
expectedShards: 4, |
|
}, |
|
{ |
|
name: "backlogged 90s", |
|
prevShards: 4, |
|
dataIn: 10, |
|
dataOut: 10, |
|
dataOutDuration: 4, |
|
backlog: 90, |
|
expectedShards: 22, |
|
}, |
|
{ |
|
name: "backlog reduced", |
|
prevShards: 22, |
|
dataIn: 10, |
|
dataOut: 20, |
|
dataOutDuration: 4, |
|
backlog: 10, |
|
expectedShards: 3, |
|
}, |
|
{ |
|
name: "backlog eliminated", |
|
prevShards: 3, |
|
dataIn: 10, |
|
dataOut: 10, |
|
dataOutDuration: 2, |
|
backlog: 0, |
|
expectedShards: 2, // Shard back down. |
|
}, |
|
{ |
|
name: "slight slowdown", |
|
prevShards: 1, |
|
dataIn: 10, |
|
dataOut: 10, |
|
dataOutDuration: 1.2, |
|
expectedShards: 2, // 1.2 is rounded up to 2. |
|
}, |
|
{ |
|
name: "bigger slowdown", |
|
prevShards: 1, |
|
dataIn: 10, |
|
dataOut: 10, |
|
dataOutDuration: 1.4, |
|
expectedShards: 2, |
|
}, |
|
{ |
|
name: "speed up", |
|
prevShards: 2, |
|
dataIn: 10, |
|
dataOut: 10, |
|
dataOutDuration: 1.2, |
|
backlog: 0, |
|
expectedShards: 2, // No reaction - 1.2 is rounded up to 2. |
|
}, |
|
{ |
|
name: "speed up more", |
|
prevShards: 2, |
|
dataIn: 10, |
|
dataOut: 10, |
|
dataOutDuration: 0.9, |
|
backlog: 0, |
|
expectedShards: 1, |
|
}, |
|
{ |
|
name: "marginal decision A", |
|
prevShards: 3, |
|
dataIn: 10, |
|
dataOut: 10, |
|
dataOutDuration: 2.01, |
|
backlog: 0, |
|
expectedShards: 3, // 2.01 rounds up to 3. |
|
}, |
|
{ |
|
name: "marginal decision B", |
|
prevShards: 3, |
|
dataIn: 10, |
|
dataOut: 10, |
|
dataOutDuration: 1.99, |
|
backlog: 0, |
|
expectedShards: 2, // 1.99 rounds up to 2. |
|
}, |
|
} { |
|
t.Run(tc.name, func(t *testing.T) { |
|
m.numShards = tc.prevShards |
|
forceEMWA(samplesIn, tc.dataIn*int64(shardUpdateDuration/time.Second)) |
|
samplesIn.tick() |
|
forceEMWA(m.dataOut, tc.dataOut*int64(shardUpdateDuration/time.Second)) |
|
forceEMWA(m.dataDropped, tc.dataDropped*int64(shardUpdateDuration/time.Second)) |
|
forceEMWA(m.dataOutDuration, int64(tc.dataOutDuration*float64(shardUpdateDuration))) |
|
m.highestRecvTimestamp.value = tc.backlog // Not Set() because it can only increase value. |
|
|
|
require.Equal(t, tc.expectedShards, m.calculateDesiredShards()) |
|
}) |
|
} |
|
} |
|
|
|
func forceEMWA(r *ewmaRate, rate int64) { |
|
r.init = false |
|
r.newEvents.Store(rate) |
|
} |
|
|
|
func TestQueueManagerMetrics(t *testing.T) { |
|
reg := prometheus.NewPedanticRegistry() |
|
metrics := newQueueManagerMetrics(reg, "name", "http://localhost:1234") |
|
|
|
// Make sure metrics pass linting. |
|
problems, err := client_testutil.GatherAndLint(reg) |
|
require.NoError(t, err) |
|
require.Empty(t, problems, "Metric linting problems detected: %v", problems) |
|
|
|
// Make sure all metrics were unregistered. A failure here means you need |
|
// unregister a metric in `queueManagerMetrics.unregister()`. |
|
metrics.unregister() |
|
err = client_testutil.GatherAndCompare(reg, strings.NewReader("")) |
|
require.NoError(t, err) |
|
} |
|
|
|
func TestQueue_FlushAndShutdownDoesNotDeadlock(t *testing.T) { |
|
capacity := 100 |
|
batchSize := 10 |
|
queue := newQueue(batchSize, capacity) |
|
for i := 0; i < capacity+batchSize; i++ { |
|
queue.Append(timeSeries{}) |
|
} |
|
|
|
done := make(chan struct{}) |
|
go queue.FlushAndShutdown(done) |
|
go func() { |
|
// Give enough time for FlushAndShutdown to acquire the lock. queue.Batch() |
|
// should not block forever even if the lock is acquired. |
|
time.Sleep(10 * time.Millisecond) |
|
queue.Batch() |
|
close(done) |
|
}() |
|
select { |
|
case <-done: |
|
case <-time.After(2 * time.Second): |
|
t.Error("Deadlock in FlushAndShutdown detected") |
|
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) |
|
t.FailNow() |
|
} |
|
} |
|
|
|
func TestDropOldTimeSeries(t *testing.T) { |
|
size := 10 |
|
nSeries := 6 |
|
nSamples := config.DefaultQueueConfig.Capacity * size |
|
samples, newSamples, series := createTimeseriesWithOldSamples(nSamples, nSeries) |
|
|
|
c := NewTestWriteClient() |
|
c.expectSamples(newSamples, series) |
|
|
|
cfg := config.DefaultQueueConfig |
|
mcfg := config.DefaultMetadataConfig |
|
cfg.MaxShards = 1 |
|
cfg.SampleAgeLimit = model.Duration(60 * time.Second) |
|
m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c) |
|
m.StoreSeries(series, 0) |
|
|
|
m.Start() |
|
defer m.Stop() |
|
|
|
m.Append(samples) |
|
c.waitForExpectedData(t) |
|
} |
|
|
|
func TestIsSampleOld(t *testing.T) { |
|
currentTime := time.Now() |
|
require.True(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-61*time.Second)))) |
|
require.False(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-59*time.Second)))) |
|
} |
|
|
|
func createTimeseriesWithOldSamples(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSample, []record.RefSeries) { |
|
newSamples := make([]record.RefSample, 0, numSamples) |
|
samples := make([]record.RefSample, 0, numSamples) |
|
series := make([]record.RefSeries, 0, numSeries) |
|
lb := labels.NewScratchBuilder(1 + len(extraLabels)) |
|
for i := 0; i < numSeries; i++ { |
|
name := fmt.Sprintf("test_metric_%d", i) |
|
// We create half of the samples in the past. |
|
past := timestamp.FromTime(time.Now().Add(-5 * time.Minute)) |
|
for j := 0; j < numSamples/2; j++ { |
|
samples = append(samples, record.RefSample{ |
|
Ref: chunks.HeadSeriesRef(i), |
|
T: past + int64(j), |
|
V: float64(i), |
|
}) |
|
} |
|
for j := 0; j < numSamples/2; j++ { |
|
sample := record.RefSample{ |
|
Ref: chunks.HeadSeriesRef(i), |
|
T: int64(int(time.Now().UnixMilli()) + j), |
|
V: float64(i), |
|
} |
|
samples = append(samples, sample) |
|
newSamples = append(newSamples, sample) |
|
} |
|
// Create Labels that is name of series plus any extra labels supplied. |
|
lb.Reset() |
|
lb.Add(labels.MetricName, name) |
|
for _, l := range extraLabels { |
|
lb.Add(l.Name, l.Value) |
|
} |
|
lb.Sort() |
|
series = append(series, record.RefSeries{ |
|
Ref: chunks.HeadSeriesRef(i), |
|
Labels: lb.Labels(), |
|
}) |
|
} |
|
return samples, newSamples, series |
|
} |
|
|
|
func filterTsLimit(limit int64, ts prompb.TimeSeries) bool { |
|
return limit > ts.Samples[0].Timestamp |
|
} |
|
|
|
func TestBuildTimeSeries(t *testing.T) { |
|
testCases := []struct { |
|
name string |
|
ts []prompb.TimeSeries |
|
filter func(ts prompb.TimeSeries) bool |
|
lowestTs int64 |
|
highestTs int64 |
|
droppedSamples int |
|
responseLen int |
|
}{ |
|
{ |
|
name: "No filter applied", |
|
ts: []prompb.TimeSeries{ |
|
{ |
|
Samples: []prompb.Sample{ |
|
{ |
|
Timestamp: 1234567890, |
|
Value: 1.23, |
|
}, |
|
}, |
|
}, |
|
{ |
|
Samples: []prompb.Sample{ |
|
{ |
|
Timestamp: 1234567891, |
|
Value: 2.34, |
|
}, |
|
}, |
|
}, |
|
{ |
|
Samples: []prompb.Sample{ |
|
{ |
|
Timestamp: 1234567892, |
|
Value: 3.34, |
|
}, |
|
}, |
|
}, |
|
}, |
|
filter: nil, |
|
responseLen: 3, |
|
lowestTs: 1234567890, |
|
highestTs: 1234567892, |
|
}, |
|
{ |
|
name: "Filter applied, samples in order", |
|
ts: []prompb.TimeSeries{ |
|
{ |
|
Samples: []prompb.Sample{ |
|
{ |
|
Timestamp: 1234567890, |
|
Value: 1.23, |
|
}, |
|
}, |
|
}, |
|
{ |
|
Samples: []prompb.Sample{ |
|
{ |
|
Timestamp: 1234567891, |
|
Value: 2.34, |
|
}, |
|
}, |
|
}, |
|
{ |
|
Samples: []prompb.Sample{ |
|
{ |
|
Timestamp: 1234567892, |
|
Value: 3.45, |
|
}, |
|
}, |
|
}, |
|
{ |
|
Samples: []prompb.Sample{ |
|
{ |
|
Timestamp: 1234567893, |
|
Value: 3.45, |
|
}, |
|
}, |
|
}, |
|
}, |
|
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) }, |
|
responseLen: 2, |
|
lowestTs: 1234567892, |
|
highestTs: 1234567893, |
|
droppedSamples: 2, |
|
}, |
|
{ |
|
name: "Filter applied, samples out of order", |
|
ts: []prompb.TimeSeries{ |
|
{ |
|
Samples: []prompb.Sample{ |
|
{ |
|
Timestamp: 1234567892, |
|
Value: 3.45, |
|
}, |
|
}, |
|
}, |
|
{ |
|
Samples: []prompb.Sample{ |
|
{ |
|
Timestamp: 1234567890, |
|
Value: 1.23, |
|
}, |
|
}, |
|
}, |
|
{ |
|
Samples: []prompb.Sample{ |
|
{ |
|
Timestamp: 1234567893, |
|
Value: 3.45, |
|
}, |
|
}, |
|
}, |
|
{ |
|
Samples: []prompb.Sample{ |
|
{ |
|
Timestamp: 1234567891, |
|
Value: 2.34, |
|
}, |
|
}, |
|
}, |
|
}, |
|
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) }, |
|
responseLen: 2, |
|
lowestTs: 1234567892, |
|
highestTs: 1234567893, |
|
droppedSamples: 2, |
|
}, |
|
{ |
|
name: "Filter applied, samples not consecutive", |
|
ts: []prompb.TimeSeries{ |
|
{ |
|
Samples: []prompb.Sample{ |
|
{ |
|
Timestamp: 1234567890, |
|
Value: 1.23, |
|
}, |
|
}, |
|
}, |
|
{ |
|
Samples: []prompb.Sample{ |
|
{ |
|
Timestamp: 1234567892, |
|
Value: 3.45, |
|
}, |
|
}, |
|
}, |
|
{ |
|
Samples: []prompb.Sample{ |
|
{ |
|
Timestamp: 1234567895, |
|
Value: 6.78, |
|
}, |
|
}, |
|
}, |
|
{ |
|
Samples: []prompb.Sample{ |
|
{ |
|
Timestamp: 1234567897, |
|
Value: 6.78, |
|
}, |
|
}, |
|
}, |
|
}, |
|
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567895, ts) }, |
|
responseLen: 2, |
|
lowestTs: 1234567895, |
|
highestTs: 1234567897, |
|
droppedSamples: 2, |
|
}, |
|
} |
|
|
|
// Run the test cases |
|
for _, tc := range testCases { |
|
t.Run(tc.name, func(t *testing.T) { |
|
highest, lowest, result, droppedSamples, _, _ := buildTimeSeries(tc.ts, tc.filter) |
|
require.NotNil(t, result) |
|
require.Len(t, result, tc.responseLen) |
|
require.Equal(t, tc.highestTs, highest) |
|
require.Equal(t, tc.lowestTs, lowest) |
|
require.Equal(t, tc.droppedSamples, droppedSamples) |
|
}) |
|
} |
|
}
|
|
|