|
|
@ -163,16 +163,23 @@ func TestSampleDelivery(t *testing.T) { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func TestMetadataDelivery(t *testing.T) { |
|
|
|
func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration) (*TestWriteClient, *QueueManager) { |
|
|
|
c := NewTestWriteClient() |
|
|
|
c := NewTestWriteClient() |
|
|
|
|
|
|
|
|
|
|
|
dir := t.TempDir() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
mcfg := config.DefaultMetadataConfig |
|
|
|
mcfg := config.DefaultMetadataConfig |
|
|
|
|
|
|
|
return c, newTestQueueManager(t, cfg, mcfg, flushDeadline, c) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient) *QueueManager { |
|
|
|
|
|
|
|
dir := t.TempDir() |
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return m |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func TestMetadataDelivery(t *testing.T) { |
|
|
|
|
|
|
|
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline) |
|
|
|
m.Start() |
|
|
|
m.Start() |
|
|
|
defer m.Stop() |
|
|
|
defer m.Stop() |
|
|
|
|
|
|
|
|
|
|
@ -192,7 +199,7 @@ func TestMetadataDelivery(t *testing.T) { |
|
|
|
require.Len(t, c.receivedMetadata, numMetadata) |
|
|
|
require.Len(t, c.receivedMetadata, numMetadata) |
|
|
|
// One more write than the rounded qoutient should be performed in order to get samples that didn't
|
|
|
|
// One more write than the rounded qoutient should be performed in order to get samples that didn't
|
|
|
|
// fit into MaxSamplesPerSend.
|
|
|
|
// fit into MaxSamplesPerSend.
|
|
|
|
require.Equal(t, numMetadata/mcfg.MaxSamplesPerSend+1, c.writesReceived) |
|
|
|
require.Equal(t, numMetadata/config.DefaultMetadataConfig.MaxSamplesPerSend+1, c.writesReceived) |
|
|
|
// Make sure the last samples were sent.
|
|
|
|
// Make sure the last samples were sent.
|
|
|
|
require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric) |
|
|
|
require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric) |
|
|
|
} |
|
|
|
} |
|
|
@ -201,17 +208,13 @@ func TestSampleDeliveryTimeout(t *testing.T) { |
|
|
|
// Let's send one less sample than batch size, and wait the timeout duration
|
|
|
|
// Let's send one less sample than batch size, and wait the timeout duration
|
|
|
|
n := 9 |
|
|
|
n := 9 |
|
|
|
samples, series := createTimeseries(n, n) |
|
|
|
samples, series := createTimeseries(n, n) |
|
|
|
c := NewTestWriteClient() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
mcfg := config.DefaultMetadataConfig |
|
|
|
|
|
|
|
cfg.MaxShards = 1 |
|
|
|
cfg.MaxShards = 1 |
|
|
|
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) |
|
|
|
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) |
|
|
|
|
|
|
|
|
|
|
|
dir := t.TempDir() |
|
|
|
c := NewTestWriteClient() |
|
|
|
|
|
|
|
m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c) |
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
|
|
|
|
m.StoreSeries(series, 0) |
|
|
|
m.StoreSeries(series, 0) |
|
|
|
m.Start() |
|
|
|
m.Start() |
|
|
|
defer m.Stop() |
|
|
|
defer m.Stop() |
|
|
@ -244,16 +247,8 @@ func TestSampleDeliveryOrder(t *testing.T) { |
|
|
|
}) |
|
|
|
}) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
c := NewTestWriteClient() |
|
|
|
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline) |
|
|
|
c.expectSamples(samples, series) |
|
|
|
c.expectSamples(samples, series) |
|
|
|
|
|
|
|
|
|
|
|
dir := t.TempDir() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
|
|
|
|
mcfg := config.DefaultMetadataConfig |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
|
|
|
|
m.StoreSeries(series, 0) |
|
|
|
m.StoreSeries(series, 0) |
|
|
|
|
|
|
|
|
|
|
|
m.Start() |
|
|
|
m.Start() |
|
|
@ -267,13 +262,10 @@ func TestShutdown(t *testing.T) { |
|
|
|
deadline := 1 * time.Second |
|
|
|
deadline := 1 * time.Second |
|
|
|
c := NewTestBlockedWriteClient() |
|
|
|
c := NewTestBlockedWriteClient() |
|
|
|
|
|
|
|
|
|
|
|
dir := t.TempDir() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
mcfg := config.DefaultMetadataConfig |
|
|
|
mcfg := config.DefaultMetadataConfig |
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
m := newTestQueueManager(t, cfg, mcfg, deadline, c) |
|
|
|
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend |
|
|
|
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend |
|
|
|
samples, series := createTimeseries(n, n) |
|
|
|
samples, series := createTimeseries(n, n) |
|
|
|
m.StoreSeries(series, 0) |
|
|
|
m.StoreSeries(series, 0) |
|
|
@ -306,12 +298,10 @@ func TestSeriesReset(t *testing.T) { |
|
|
|
numSegments := 4 |
|
|
|
numSegments := 4 |
|
|
|
numSeries := 25 |
|
|
|
numSeries := 25 |
|
|
|
|
|
|
|
|
|
|
|
dir := t.TempDir() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
mcfg := config.DefaultMetadataConfig |
|
|
|
mcfg := config.DefaultMetadataConfig |
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
m := newTestQueueManager(t, cfg, mcfg, deadline, c) |
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
|
|
|
|
for i := 0; i < numSegments; i++ { |
|
|
|
for i := 0; i < numSegments; i++ { |
|
|
|
series := []record.RefSeries{} |
|
|
|
series := []record.RefSeries{} |
|
|
|
for j := 0; j < numSeries; j++ { |
|
|
|
for j := 0; j < numSeries; j++ { |
|
|
@ -330,17 +320,12 @@ func TestReshard(t *testing.T) { |
|
|
|
nSamples := config.DefaultQueueConfig.Capacity * size |
|
|
|
nSamples := config.DefaultQueueConfig.Capacity * size |
|
|
|
samples, series := createTimeseries(nSamples, nSeries) |
|
|
|
samples, series := createTimeseries(nSamples, nSeries) |
|
|
|
|
|
|
|
|
|
|
|
c := NewTestWriteClient() |
|
|
|
|
|
|
|
c.expectSamples(samples, series) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
mcfg := config.DefaultMetadataConfig |
|
|
|
|
|
|
|
cfg.MaxShards = 1 |
|
|
|
cfg.MaxShards = 1 |
|
|
|
|
|
|
|
|
|
|
|
dir := t.TempDir() |
|
|
|
c := NewTestWriteClient() |
|
|
|
|
|
|
|
m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c) |
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
c.expectSamples(samples, series) |
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
|
|
|
|
m.StoreSeries(series, 0) |
|
|
|
m.StoreSeries(series, 0) |
|
|
|
|
|
|
|
|
|
|
|
m.Start() |
|
|
|
m.Start() |
|
|
@ -363,7 +348,7 @@ func TestReshard(t *testing.T) { |
|
|
|
c.waitForExpectedData(t) |
|
|
|
c.waitForExpectedData(t) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func TestReshardRaceWithStop(*testing.T) { |
|
|
|
func TestReshardRaceWithStop(t *testing.T) { |
|
|
|
c := NewTestWriteClient() |
|
|
|
c := NewTestWriteClient() |
|
|
|
var m *QueueManager |
|
|
|
var m *QueueManager |
|
|
|
h := sync.Mutex{} |
|
|
|
h := sync.Mutex{} |
|
|
@ -375,8 +360,7 @@ func TestReshardRaceWithStop(*testing.T) { |
|
|
|
exitCh := make(chan struct{}) |
|
|
|
exitCh := make(chan struct{}) |
|
|
|
go func() { |
|
|
|
go func() { |
|
|
|
for { |
|
|
|
for { |
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
m = newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c) |
|
|
|
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
|
|
|
|
m.Start() |
|
|
|
m.Start() |
|
|
|
h.Unlock() |
|
|
|
h.Unlock() |
|
|
|
h.Lock() |
|
|
|
h.Lock() |
|
|
@ -410,8 +394,7 @@ func TestReshardPartialBatch(t *testing.T) { |
|
|
|
flushDeadline := 10 * time.Millisecond |
|
|
|
flushDeadline := 10 * time.Millisecond |
|
|
|
cfg.BatchSendDeadline = model.Duration(batchSendDeadline) |
|
|
|
cfg.BatchSendDeadline = model.Duration(batchSendDeadline) |
|
|
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c) |
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
|
|
|
|
m.StoreSeries(series, 0) |
|
|
|
m.StoreSeries(series, 0) |
|
|
|
|
|
|
|
|
|
|
|
m.Start() |
|
|
|
m.Start() |
|
|
@ -454,9 +437,7 @@ func TestQueueFilledDeadlock(t *testing.T) { |
|
|
|
batchSendDeadline := time.Millisecond |
|
|
|
batchSendDeadline := time.Millisecond |
|
|
|
cfg.BatchSendDeadline = model.Duration(batchSendDeadline) |
|
|
|
cfg.BatchSendDeadline = model.Duration(batchSendDeadline) |
|
|
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c) |
|
|
|
|
|
|
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
|
|
|
|
m.StoreSeries(series, 0) |
|
|
|
m.StoreSeries(series, 0) |
|
|
|
m.Start() |
|
|
|
m.Start() |
|
|
|
defer m.Stop() |
|
|
|
defer m.Stop() |
|
|
@ -479,11 +460,7 @@ func TestQueueFilledDeadlock(t *testing.T) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func TestReleaseNoninternedString(t *testing.T) { |
|
|
|
func TestReleaseNoninternedString(t *testing.T) { |
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline) |
|
|
|
mcfg := config.DefaultMetadataConfig |
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
|
|
|
|
c := NewTestWriteClient() |
|
|
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
|
|
|
|
m.Start() |
|
|
|
m.Start() |
|
|
|
defer m.Stop() |
|
|
|
defer m.Stop() |
|
|
|
|
|
|
|
|
|
|
@ -525,12 +502,8 @@ func TestShouldReshard(t *testing.T) { |
|
|
|
}, |
|
|
|
}, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
|
|
|
|
mcfg := config.DefaultMetadataConfig |
|
|
|
|
|
|
|
for _, c := range cases { |
|
|
|
for _, c := range cases { |
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline) |
|
|
|
client := NewTestWriteClient() |
|
|
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
|
|
|
|
m.numShards = c.startingShards |
|
|
|
m.numShards = c.startingShards |
|
|
|
m.dataIn.incr(c.samplesIn) |
|
|
|
m.dataIn.incr(c.samplesIn) |
|
|
|
m.dataOut.incr(c.samplesOut) |
|
|
|
m.dataOut.incr(c.samplesOut) |
|
|
@ -904,10 +877,7 @@ func BenchmarkSampleSend(b *testing.B) { |
|
|
|
cfg.MinShards = 20 |
|
|
|
cfg.MinShards = 20 |
|
|
|
cfg.MaxShards = 20 |
|
|
|
cfg.MaxShards = 20 |
|
|
|
|
|
|
|
|
|
|
|
dir := b.TempDir() |
|
|
|
m := newTestQueueManager(b, cfg, mcfg, defaultFlushDeadline, c) |
|
|
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
|
|
|
|
m.StoreSeries(series, 0) |
|
|
|
m.StoreSeries(series, 0) |
|
|
|
|
|
|
|
|
|
|
|
// These should be received by the client.
|
|
|
|
// These should be received by the client.
|
|
|
@ -1083,15 +1053,9 @@ func TestProcessExternalLabels(t *testing.T) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func TestCalculateDesiredShards(t *testing.T) { |
|
|
|
func TestCalculateDesiredShards(t *testing.T) { |
|
|
|
c := NewTestWriteClient() |
|
|
|
|
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
mcfg := config.DefaultMetadataConfig |
|
|
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline) |
|
|
|
|
|
|
|
samplesIn := m.dataIn |
|
|
|
dir := t.TempDir() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
|
|
|
|
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) |
|
|
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Need to start the queue manager so the proper metrics are initialized.
|
|
|
|
// Need to start the queue manager so the proper metrics are initialized.
|
|
|
|
// However we can stop it right away since we don't need to do any actual
|
|
|
|
// However we can stop it right away since we don't need to do any actual
|
|
|
@ -1160,15 +1124,8 @@ func TestCalculateDesiredShards(t *testing.T) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func TestCalculateDesiredShardsDetail(t *testing.T) { |
|
|
|
func TestCalculateDesiredShardsDetail(t *testing.T) { |
|
|
|
c := NewTestWriteClient() |
|
|
|
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline) |
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
samplesIn := m.dataIn |
|
|
|
mcfg := config.DefaultMetadataConfig |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
dir := t.TempDir() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
|
|
|
|
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) |
|
|
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for _, tc := range []struct { |
|
|
|
for _, tc := range []struct { |
|
|
|
name string |
|
|
|
name string |
|
|
@ -1393,10 +1350,7 @@ func TestDropOldTimeSeries(t *testing.T) { |
|
|
|
mcfg := config.DefaultMetadataConfig |
|
|
|
mcfg := config.DefaultMetadataConfig |
|
|
|
cfg.MaxShards = 1 |
|
|
|
cfg.MaxShards = 1 |
|
|
|
cfg.SampleAgeLimit = model.Duration(60 * time.Second) |
|
|
|
cfg.SampleAgeLimit = model.Duration(60 * time.Second) |
|
|
|
dir := t.TempDir() |
|
|
|
m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c) |
|
|
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false) |
|
|
|
|
|
|
|
m.StoreSeries(series, 0) |
|
|
|
m.StoreSeries(series, 0) |
|
|
|
|
|
|
|
|
|
|
|
m.Start() |
|
|
|
m.Start() |
|
|
|