|
|
|
@ -149,7 +149,7 @@ func TestMetadataDelivery(t *testing.T) {
|
|
|
|
|
mcfg := config.DefaultMetadataConfig
|
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m.Start()
|
|
|
|
|
defer m.Stop()
|
|
|
|
|
|
|
|
|
@ -188,7 +188,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
|
|
|
|
|
dir := t.TempDir()
|
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m.StoreSeries(series, 0)
|
|
|
|
|
m.Start()
|
|
|
|
|
defer m.Stop()
|
|
|
|
@ -217,7 +217,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
|
|
|
|
})
|
|
|
|
|
series = append(series, record.RefSeries{
|
|
|
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
|
|
|
Labels: labels.Labels{labels.Label{Name: "__name__", Value: name}},
|
|
|
|
|
Labels: labels.FromStrings("__name__", name),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -230,7 +230,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
|
|
|
|
|
mcfg := config.DefaultMetadataConfig
|
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m.StoreSeries(series, 0)
|
|
|
|
|
|
|
|
|
|
m.Start()
|
|
|
|
@ -250,7 +250,7 @@ func TestShutdown(t *testing.T) {
|
|
|
|
|
mcfg := config.DefaultMetadataConfig
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
|
|
|
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
|
|
|
|
|
samples, series := createTimeseries(n, n)
|
|
|
|
|
m.StoreSeries(series, 0)
|
|
|
|
@ -288,11 +288,11 @@ func TestSeriesReset(t *testing.T) {
|
|
|
|
|
cfg := config.DefaultQueueConfig
|
|
|
|
|
mcfg := config.DefaultMetadataConfig
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
for i := 0; i < numSegments; i++ {
|
|
|
|
|
series := []record.RefSeries{}
|
|
|
|
|
for j := 0; j < numSeries; j++ {
|
|
|
|
|
series = append(series, record.RefSeries{Ref: chunks.HeadSeriesRef((i * 100) + j), Labels: labels.Labels{{Name: "a", Value: "a"}}})
|
|
|
|
|
series = append(series, record.RefSeries{Ref: chunks.HeadSeriesRef((i * 100) + j), Labels: labels.FromStrings("a", "a")})
|
|
|
|
|
}
|
|
|
|
|
m.StoreSeries(series, i)
|
|
|
|
|
}
|
|
|
|
@ -317,7 +317,7 @@ func TestReshard(t *testing.T) {
|
|
|
|
|
dir := t.TempDir()
|
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m.StoreSeries(series, 0)
|
|
|
|
|
|
|
|
|
|
m.Start()
|
|
|
|
@ -353,7 +353,7 @@ func TestReshardRaceWithStop(t *testing.T) {
|
|
|
|
|
go func() {
|
|
|
|
|
for {
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
|
|
|
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m.Start()
|
|
|
|
|
h.Unlock()
|
|
|
|
|
h.Lock()
|
|
|
|
@ -388,7 +388,7 @@ func TestReshardPartialBatch(t *testing.T) {
|
|
|
|
|
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
|
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m.StoreSeries(series, 0)
|
|
|
|
|
|
|
|
|
|
m.Start()
|
|
|
|
@ -433,7 +433,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
|
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
|
|
|
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m.StoreSeries(series, 0)
|
|
|
|
|
m.Start()
|
|
|
|
|
defer m.Stop()
|
|
|
|
@ -460,20 +460,15 @@ func TestReleaseNoninternedString(t *testing.T) {
|
|
|
|
|
mcfg := config.DefaultMetadataConfig
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
|
|
|
c := NewTestWriteClient()
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m.Start()
|
|
|
|
|
defer m.Stop()
|
|
|
|
|
|
|
|
|
|
for i := 1; i < 1000; i++ {
|
|
|
|
|
m.StoreSeries([]record.RefSeries{
|
|
|
|
|
{
|
|
|
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
|
|
|
Labels: labels.Labels{
|
|
|
|
|
labels.Label{
|
|
|
|
|
Name: "asdf",
|
|
|
|
|
Value: fmt.Sprintf("%d", i),
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
|
|
|
Labels: labels.FromStrings("asdf", fmt.Sprintf("%d", i)),
|
|
|
|
|
},
|
|
|
|
|
}, 0)
|
|
|
|
|
m.SeriesReset(1)
|
|
|
|
@ -512,7 +507,7 @@ func TestShouldReshard(t *testing.T) {
|
|
|
|
|
for _, c := range cases {
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
|
|
|
client := NewTestWriteClient()
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m.numShards = c.startingShards
|
|
|
|
|
m.dataIn.incr(c.samplesIn)
|
|
|
|
|
m.dataOut.incr(c.samplesOut)
|
|
|
|
@ -565,19 +560,14 @@ func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []recor
|
|
|
|
|
}
|
|
|
|
|
series = append(series, record.RefSeries{
|
|
|
|
|
Ref: chunks.HeadSeriesRef(i),
|
|
|
|
|
Labels: labels.Labels{{Name: "__name__", Value: name}},
|
|
|
|
|
Labels: labels.FromStrings("__name__", name),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
return exemplars, series
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func getSeriesNameFromRef(r record.RefSeries) string {
|
|
|
|
|
for _, l := range r.Labels {
|
|
|
|
|
if l.Name == "__name__" {
|
|
|
|
|
return l.Value
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return ""
|
|
|
|
|
return r.Labels.Get("__name__")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type TestWriteClient struct {
|
|
|
|
@ -679,13 +669,8 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
|
|
|
|
|
|
|
|
|
|
count := 0
|
|
|
|
|
for _, ts := range reqProto.Timeseries {
|
|
|
|
|
var seriesName string
|
|
|
|
|
labels := labelProtosToLabels(ts.Labels)
|
|
|
|
|
for _, label := range labels {
|
|
|
|
|
if label.Name == "__name__" {
|
|
|
|
|
seriesName = label.Value
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
seriesName := labels.Get("__name__")
|
|
|
|
|
for _, sample := range ts.Samples {
|
|
|
|
|
count++
|
|
|
|
|
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample)
|
|
|
|
@ -791,7 +776,7 @@ func BenchmarkSampleSend(b *testing.B) {
|
|
|
|
|
dir := b.TempDir()
|
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m.StoreSeries(series, 0)
|
|
|
|
|
|
|
|
|
|
// These should be received by the client.
|
|
|
|
@ -837,7 +822,7 @@ func BenchmarkStartup(b *testing.B) {
|
|
|
|
|
c := NewTestBlockedWriteClient()
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, logger, dir,
|
|
|
|
|
newEWMARate(ewmaWeight, shardUpdateDuration),
|
|
|
|
|
cfg, mcfg, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
|
|
|
|
|
m.watcher.MaxSegment = segments[len(segments)-2]
|
|
|
|
|
err := m.watcher.Run()
|
|
|
|
@ -913,7 +898,7 @@ func TestCalculateDesiredShards(t *testing.T) {
|
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
|
|
|
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
|
|
|
|
|
// Need to start the queue manager so the proper metrics are initialized.
|
|
|
|
|
// However we can stop it right away since we don't need to do any actual
|
|
|
|
@ -990,7 +975,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) {
|
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "")
|
|
|
|
|
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false)
|
|
|
|
|
|
|
|
|
|
for _, tc := range []struct {
|
|
|
|
|
name string
|
|
|
|
|