prometheus/tsdb/head_test.go

1550 lines
45 KiB
Go
Raw Normal View History

2017-04-10 18:59:45 +00:00
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"fmt"
"io/ioutil"
"math"
2017-09-04 13:07:30 +00:00
"math/rand"
"os"
"path"
"path/filepath"
"sort"
"strconv"
"sync"
"testing"
"github.com/pkg/errors"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wal"
"github.com/prometheus/prometheus/util/testutil"
)
func BenchmarkCreateSeries(b *testing.B) {
series := genSeries(b.N, 10, 0, 0)
h, err := NewHead(nil, nil, nil, 10000, DefaultStripeSize)
testutil.Ok(b, err)
defer h.Close()
b.ReportAllocs()
b.ResetTimer()
for _, s := range series {
h.getOrCreate(s.Labels().Hash(), s.Labels())
}
}
2017-02-14 23:54:52 +00:00
func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) {
var enc record.Encoder
for _, r := range recs {
switch v := r.(type) {
case []record.RefSeries:
testutil.Ok(t, w.Log(enc.Series(v, nil)))
case []record.RefSample:
testutil.Ok(t, w.Log(enc.Samples(v, nil)))
case []tombstones.Stone:
testutil.Ok(t, w.Log(enc.Tombstones(v, nil)))
}
}
}
func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
sr, err := wal.NewSegmentsReader(dir)
testutil.Ok(t, err)
defer sr.Close()
var dec record.Decoder
r := wal.NewReader(sr)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
series, err := dec.Series(rec, nil)
testutil.Ok(t, err)
recs = append(recs, series)
case record.Samples:
samples, err := dec.Samples(rec, nil)
testutil.Ok(t, err)
recs = append(recs, samples)
case record.Tombstones:
tstones, err := dec.Tombstones(rec, nil)
testutil.Ok(t, err)
recs = append(recs, tstones)
default:
t.Fatalf("unknown record type")
}
}
testutil.Ok(t, r.Err())
return recs
}
func BenchmarkLoadWAL(b *testing.B) {
cases := []struct {
// Total series is (batches*seriesPerBatch).
batches int
seriesPerBatch int
samplesPerSeries int
}{
{ // Less series and more samples. 2 hour WAL with 1 second scrape interval.
batches: 10,
seriesPerBatch: 100,
samplesPerSeries: 7200,
},
{ // More series and less samples.
batches: 10,
seriesPerBatch: 10000,
samplesPerSeries: 50,
},
{ // In between.
batches: 10,
seriesPerBatch: 1000,
samplesPerSeries: 480,
},
}
labelsPerSeries := 5
for _, c := range cases {
b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries),
func(b *testing.B) {
dir, err := ioutil.TempDir("", "test_load_wal")
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, dir, false)
testutil.Ok(b, err)
// Write series.
refSeries := make([]record.RefSeries, 0, c.seriesPerBatch)
for k := 0; k < c.batches; k++ {
refSeries = refSeries[:0]
for i := k * c.seriesPerBatch; i < (k+1)*c.seriesPerBatch; i++ {
lbls := make(map[string]string, labelsPerSeries)
lbls[defaultLabelName] = strconv.Itoa(i)
for j := 1; len(lbls) < labelsPerSeries; j++ {
lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j)
}
refSeries = append(refSeries, record.RefSeries{Ref: uint64(i) * 100, Labels: labels.FromMap(lbls)})
}
populateTestWAL(b, w, []interface{}{refSeries})
}
// Write samples.
refSamples := make([]record.RefSample, 0, c.seriesPerBatch)
for i := 0; i < c.samplesPerSeries; i++ {
for j := 0; j < c.batches; j++ {
refSamples = refSamples[:0]
for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ {
refSamples = append(refSamples, record.RefSample{
Ref: uint64(k) * 100,
T: int64(i) * 10,
V: float64(i) * 100,
})
}
populateTestWAL(b, w, []interface{}{refSamples})
}
}
b.ResetTimer()
// Load the WAL.
for i := 0; i < b.N; i++ {
h, err := NewHead(nil, nil, w, 10000, DefaultStripeSize)
testutil.Ok(b, err)
h.Init(0)
}
})
}
}
func TestHead_ReadWAL(t *testing.T) {
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
entries := []interface{}{
[]record.RefSeries{
{Ref: 10, Labels: labels.FromStrings("a", "1")},
{Ref: 11, Labels: labels.FromStrings("a", "2")},
{Ref: 100, Labels: labels.FromStrings("a", "3")},
},
[]record.RefSample{
{Ref: 0, T: 99, V: 1},
{Ref: 10, T: 100, V: 2},
{Ref: 100, T: 100, V: 3},
},
[]record.RefSeries{
{Ref: 50, Labels: labels.FromStrings("a", "4")},
// This series has two refs pointing to it.
{Ref: 101, Labels: labels.FromStrings("a", "3")},
},
[]record.RefSample{
{Ref: 10, T: 101, V: 5},
{Ref: 50, T: 101, V: 6},
{Ref: 101, T: 101, V: 7},
},
[]tombstones.Stone{
{Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}},
},
}
dir, err := ioutil.TempDir("", "test_read_wal")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, dir, compress)
testutil.Ok(t, err)
defer w.Close()
populateTestWAL(t, w, entries)
head, err := NewHead(nil, nil, w, 10000, DefaultStripeSize)
testutil.Ok(t, err)
testutil.Ok(t, head.Init(math.MinInt64))
testutil.Equals(t, uint64(101), head.lastSeriesID)
s10 := head.series.getByID(10)
s11 := head.series.getByID(11)
s50 := head.series.getByID(50)
s100 := head.series.getByID(100)
testutil.Equals(t, labels.FromStrings("a", "1"), s10.lset)
testutil.Equals(t, (*memSeries)(nil), s11) // Series without samples should be garbage collected at head.Init().
testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset)
testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset)
expandChunk := func(c chunkenc.Iterator) (x []sample) {
for c.Next() {
t, v := c.At()
x = append(x, sample{t: t, v: v})
}
testutil.Ok(t, c.Err())
return x
}
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, nil)))
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, nil)))
testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil, nil)))
})
}
}
func TestHead_WALMultiRef(t *testing.T) {
dir, err := ioutil.TempDir("", "test_wal_multi_ref")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, dir, false)
testutil.Ok(t, err)
head, err := NewHead(nil, nil, w, 10000, DefaultStripeSize)
testutil.Ok(t, err)
testutil.Ok(t, head.Init(0))
app := head.Appender()
ref1, err := app.Add(labels.FromStrings("foo", "bar"), 100, 1)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Ok(t, head.Truncate(200))
app = head.Appender()
ref2, err := app.Add(labels.FromStrings("foo", "bar"), 300, 2)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
if ref1 == ref2 {
t.Fatal("Refs are the same")
}
testutil.Ok(t, head.Close())
w, err = wal.New(nil, nil, dir, false)
testutil.Ok(t, err)
head, err = NewHead(nil, nil, w, 10000, DefaultStripeSize)
testutil.Ok(t, err)
testutil.Ok(t, head.Init(0))
defer head.Close()
q, err := NewBlockQuerier(head, 0, 300)
testutil.Ok(t, err)
series := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{100, 1}, sample{300, 2}}}, series)
}
2017-09-01 12:38:49 +00:00
func TestHead_Truncate(t *testing.T) {
dir, err := ioutil.TempDir("", "test_truncate")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, dir, false)
testutil.Ok(t, err)
h, err := NewHead(nil, nil, w, 10000, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
2017-09-01 12:38:49 +00:00
h.initTime(0)
2017-09-18 10:28:56 +00:00
s1, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1"))
s2, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1"))
s3, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2"))
s4, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1"))
2017-09-01 12:38:49 +00:00
s1.chunks = []*memChunk{
{minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()},
{minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()},
{minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()},
2017-09-01 12:38:49 +00:00
}
s2.chunks = []*memChunk{
{minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()},
{minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()},
{minTime: 3000, maxTime: 3999, chunk: chunkenc.NewXORChunk()},
2017-09-01 12:38:49 +00:00
}
s3.chunks = []*memChunk{
{minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()},
{minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()},
2017-09-01 12:38:49 +00:00
}
s4.chunks = []*memChunk{}
// Truncation need not be aligned.
testutil.Ok(t, h.Truncate(1))
2017-09-01 12:38:49 +00:00
testutil.Ok(t, h.Truncate(2000))
2017-09-01 12:38:49 +00:00
testutil.Equals(t, []*memChunk{
{minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()},
}, h.series.getByID(s1.ref).chunks)
2017-09-01 12:38:49 +00:00
testutil.Equals(t, []*memChunk{
{minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()},
{minTime: 3000, maxTime: 3999, chunk: chunkenc.NewXORChunk()},
}, h.series.getByID(s2.ref).chunks)
2017-09-01 12:38:49 +00:00
2017-12-08 21:42:08 +00:00
testutil.Assert(t, h.series.getByID(s3.ref) == nil, "")
testutil.Assert(t, h.series.getByID(s4.ref) == nil, "")
2017-09-01 12:38:49 +00:00
postingsA1, _ := index.ExpandPostings(h.postings.Get("a", "1"))
postingsA2, _ := index.ExpandPostings(h.postings.Get("a", "2"))
postingsB1, _ := index.ExpandPostings(h.postings.Get("b", "1"))
postingsB2, _ := index.ExpandPostings(h.postings.Get("b", "2"))
postingsC1, _ := index.ExpandPostings(h.postings.Get("c", "1"))
postingsAll, _ := index.ExpandPostings(h.postings.Get("", ""))
2017-09-01 12:38:49 +00:00
testutil.Equals(t, []uint64{s1.ref}, postingsA1)
testutil.Equals(t, []uint64{s2.ref}, postingsA2)
testutil.Equals(t, []uint64{s1.ref, s2.ref}, postingsB1)
testutil.Equals(t, []uint64{s1.ref, s2.ref}, postingsAll)
2017-12-08 21:42:08 +00:00
testutil.Assert(t, postingsB2 == nil, "")
testutil.Assert(t, postingsC1 == nil, "")
2017-09-01 12:38:49 +00:00
testutil.Equals(t, map[string]struct{}{
"": {}, // from 'all' postings list
"a": {},
"b": {},
"1": {},
"2": {},
2017-09-01 12:38:49 +00:00
}, h.symbols)
testutil.Equals(t, map[string]stringset{
"a": {"1": struct{}{}, "2": struct{}{}},
"b": {"1": struct{}{}},
"": {"": struct{}{}},
2017-09-01 12:38:49 +00:00
}, h.values)
}
// Validate various behaviors brought on by firstChunkID accounting for
// garbage collected chunks.
func TestMemSeries_truncateChunks(t *testing.T) {
s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000)
for i := 0; i < 4000; i += 5 {
ok, _ := s.append(int64(i), float64(i), 0)
testutil.Assert(t, ok == true, "sample append failed")
2017-09-01 12:38:49 +00:00
}
// Check that truncate removes half of the chunks and afterwards
// that the ID of the last chunk still gives us the same chunk afterwards.
countBefore := len(s.chunks)
lastID := s.chunkID(countBefore - 1)
lastChunk := s.chunk(lastID)
2017-12-08 21:42:08 +00:00
testutil.Assert(t, s.chunk(0) != nil, "")
testutil.Assert(t, lastChunk != nil, "")
2017-09-01 12:38:49 +00:00
s.truncateChunksBefore(2000)
testutil.Equals(t, int64(2000), s.chunks[0].minTime)
2017-12-08 21:42:08 +00:00
testutil.Assert(t, s.chunk(0) == nil, "first chunks not gone")
testutil.Equals(t, countBefore/2, len(s.chunks))
testutil.Equals(t, lastChunk, s.chunk(lastID))
2017-09-01 12:38:49 +00:00
// Validate that the series' sample buffer is applied correctly to the last chunk
// after truncation.
it1 := s.iterator(s.chunkID(len(s.chunks)-1), nil, nil)
2017-09-01 12:38:49 +00:00
_, ok := it1.(*memSafeIterator)
testutil.Assert(t, ok == true, "")
2017-09-01 12:38:49 +00:00
it2 := s.iterator(s.chunkID(len(s.chunks)-2), nil, nil)
2017-09-01 12:38:49 +00:00
_, ok = it2.(*memSafeIterator)
testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer")
2017-09-01 12:38:49 +00:00
}
2018-02-07 13:43:21 +00:00
func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
entries := []interface{}{
[]record.RefSeries{
{Ref: 10, Labels: labels.FromStrings("a", "1")},
},
[]record.RefSample{},
[]record.RefSeries{
{Ref: 50, Labels: labels.FromStrings("a", "2")},
},
[]record.RefSample{
{Ref: 50, T: 80, V: 1},
{Ref: 50, T: 90, V: 1},
},
}
dir, err := ioutil.TempDir("", "test_delete_series")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, dir, compress)
testutil.Ok(t, err)
defer w.Close()
populateTestWAL(t, w, entries)
2018-02-07 13:43:21 +00:00
head, err := NewHead(nil, nil, w, 1000, DefaultStripeSize)
testutil.Ok(t, err)
2018-02-07 13:43:21 +00:00
testutil.Ok(t, head.Init(math.MinInt64))
2018-02-07 13:43:21 +00:00
testutil.Ok(t, head.Delete(0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "1")))
})
}
2018-02-07 13:43:21 +00:00
}
2017-09-04 13:07:30 +00:00
func TestHeadDeleteSimple(t *testing.T) {
buildSmpls := func(s []int64) []sample {
ss := make([]sample, 0, len(s))
for _, t := range s {
ss = append(ss, sample{t: t, v: float64(t)})
}
return ss
2017-09-04 13:07:30 +00:00
}
smplsAll := buildSmpls([]int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
lblDefault := labels.Label{Name: "a", Value: "b"}
2017-09-04 13:07:30 +00:00
cases := []struct {
dranges tombstones.Intervals
addSamples []sample // Samples to add after delete.
smplsExp []sample
2017-09-04 13:07:30 +00:00
}{
{
dranges: tombstones.Intervals{{Mint: 0, Maxt: 3}},
smplsExp: buildSmpls([]int64{4, 5, 6, 7, 8, 9}),
2017-09-04 13:07:30 +00:00
},
{
dranges: tombstones.Intervals{{Mint: 1, Maxt: 3}},
smplsExp: buildSmpls([]int64{0, 4, 5, 6, 7, 8, 9}),
2017-09-04 13:07:30 +00:00
},
{
dranges: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 7}},
smplsExp: buildSmpls([]int64{0, 8, 9}),
2017-09-04 13:07:30 +00:00
},
{
dranges: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 700}},
smplsExp: buildSmpls([]int64{0}),
2017-09-04 13:07:30 +00:00
},
{ // This case is to ensure that labels and symbols are deleted.
dranges: tombstones.Intervals{{Mint: 0, Maxt: 9}},
smplsExp: buildSmpls([]int64{}),
2017-09-04 13:07:30 +00:00
},
{
dranges: tombstones.Intervals{{Mint: 1, Maxt: 3}},
addSamples: buildSmpls([]int64{11, 13, 15}),
smplsExp: buildSmpls([]int64{0, 4, 5, 6, 7, 8, 9, 11, 13, 15}),
},
{
// After delete, the appended samples in the deleted range should be visible
// as the tombstones are clamped to head min/max time.
dranges: tombstones.Intervals{{Mint: 7, Maxt: 20}},
addSamples: buildSmpls([]int64{11, 13, 15}),
smplsExp: buildSmpls([]int64{0, 1, 2, 3, 4, 5, 6, 11, 13, 15}),
},
2017-09-04 13:07:30 +00:00
}
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
for _, c := range cases {
dir, err := ioutil.TempDir("", "test_wal_reload")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, path.Join(dir, "wal"), compress)
testutil.Ok(t, err)
defer w.Close()
head, err := NewHead(nil, nil, w, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer head.Close()
app := head.Appender()
for _, smpl := range smplsAll {
_, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
// Delete the ranges.
for _, r := range c.dranges {
testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)))
}
// Add more samples.
app = head.Appender()
for _, smpl := range c.addSamples {
_, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
// Compare the samples for both heads - before and after the reload.
reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reload.
testutil.Ok(t, err)
defer reloadedW.Close()
reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer reloadedHead.Close()
testutil.Ok(t, reloadedHead.Init(0))
// Compare the query results for both heads - before and after the reload.
Outer:
for _, h := range []*Head{head, reloadedHead} {
q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime())
testutil.Ok(t, err)
actSeriesSet, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
expSeriesSet := newMockSeriesSet([]storage.Series{
newSeries(map[string]string{lblDefault.Name: lblDefault.Value}, func() []tsdbutil.Sample {
ss := make([]tsdbutil.Sample, 0, len(c.smplsExp))
for _, s := range c.smplsExp {
ss = append(ss, s)
}
return ss
}(),
),
})
for {
eok, rok := expSeriesSet.Next(), actSeriesSet.Next()
testutil.Equals(t, eok, rok)
if !eok {
testutil.Ok(t, h.Close())
continue Outer
}
expSeries := expSeriesSet.At()
actSeries := actSeriesSet.At()
testutil.Equals(t, expSeries.Labels(), actSeries.Labels())
smplExp, errExp := expandSeriesIterator(expSeries.Iterator())
smplRes, errRes := expandSeriesIterator(actSeries.Iterator())
testutil.Equals(t, errExp, errRes)
testutil.Equals(t, smplExp, smplRes)
}
}
}
})
2017-09-04 13:07:30 +00:00
}
}
func TestDeleteUntilCurMax(t *testing.T) {
numSamples := int64(10)
hb, err := NewHead(nil, nil, nil, 1000000, DefaultStripeSize)
testutil.Ok(t, err)
defer hb.Close()
app := hb.Appender()
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
smpls[i] = rand.Float64()
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
testutil.Ok(t, hb.Delete(0, 10000, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
// Test the series returns no samples. The series is cleared only after compaction.
q, err := NewBlockQuerier(hb, 0, 100000)
testutil.Ok(t, err)
res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
testutil.Assert(t, res.Next(), "series is not present")
s := res.At()
it := s.Iterator()
testutil.Assert(t, !it.Next(), "expected no samples")
// Add again and test for presence.
app = hb.Appender()
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 11, 1)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
q, err = NewBlockQuerier(hb, 0, 100000)
testutil.Ok(t, err)
res, ws, err = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
testutil.Assert(t, res.Next(), "series don't exist")
exps := res.At()
it = exps.Iterator()
Spelling (#6517) * spelling: alertmanager Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: attributes Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: autocomplete Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: bootstrap Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: caught Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: chunkenc Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: compaction Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: corrupted Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: deletable Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: expected Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: fine-grained Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: initialized Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: iteration Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: javascript Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: multiple Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: number Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: overlapping Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: possible Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: postings Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: procedure Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: programmatic Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: queuing Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: querier Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: repairing Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: received Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: reproducible Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: retention Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: sample Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: segements Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: semantic Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: software [LICENSE] Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: staging Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: timestamp Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: unfortunately Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: uvarint Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: subsequently Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: ressamples Signed-off-by: Josh Soref <jsoref@users.noreply.github.com>
2020-01-02 14:54:09 +00:00
resSamples, err := expandSeriesIterator(it)
testutil.Ok(t, err)
Spelling (#6517) * spelling: alertmanager Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: attributes Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: autocomplete Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: bootstrap Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: caught Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: chunkenc Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: compaction Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: corrupted Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: deletable Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: expected Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: fine-grained Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: initialized Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: iteration Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: javascript Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: multiple Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: number Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: overlapping Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: possible Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: postings Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: procedure Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: programmatic Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: queuing Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: querier Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: repairing Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: received Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: reproducible Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: retention Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: sample Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: segements Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: semantic Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: software [LICENSE] Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: staging Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: timestamp Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: unfortunately Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: uvarint Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: subsequently Signed-off-by: Josh Soref <jsoref@users.noreply.github.com> * spelling: ressamples Signed-off-by: Josh Soref <jsoref@users.noreply.github.com>
2020-01-02 14:54:09 +00:00
testutil.Equals(t, []tsdbutil.Sample{sample{11, 1}}, resSamples)
}
func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "test_delete_wal")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
wlog, err := wal.NewSize(nil, nil, dir, 32768, false)
testutil.Ok(t, err)
// Enough samples to cause a checkpoint.
numSamples := 10000
hb, err := NewHead(nil, nil, wlog, int64(numSamples)*10, DefaultStripeSize)
testutil.Ok(t, err)
defer hb.Close()
for i := 0; i < numSamples; i++ {
app := hb.Appender()
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, int64(i), 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
}
testutil.Ok(t, hb.Delete(0, int64(numSamples), labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
testutil.Ok(t, hb.Truncate(1))
testutil.Ok(t, hb.Close())
// Confirm there's been a checkpoint.
cdir, _, err := wal.LastCheckpoint(dir)
testutil.Ok(t, err)
// Read in checkpoint and WAL.
recs := readTestWAL(t, cdir)
recs = append(recs, readTestWAL(t, dir)...)
var series, samples, stones int
for _, rec := range recs {
switch rec.(type) {
case []record.RefSeries:
series++
case []record.RefSample:
samples++
case []tombstones.Stone:
stones++
default:
t.Fatalf("unknown record type")
}
}
testutil.Equals(t, 1, series)
testutil.Equals(t, 9999, samples)
testutil.Equals(t, 1, stones)
}
func TestDelete_e2e(t *testing.T) {
numDatapoints := 1000
numRanges := 1000
timeInterval := int64(2)
// Create 8 series with 1000 data-points of different ranges, delete and run queries.
lbls := [][]labels.Label{
{
{Name: "a", Value: "b"},
{Name: "instance", Value: "localhost:9090"},
{Name: "job", Value: "prometheus"},
},
{
{Name: "a", Value: "b"},
{Name: "instance", Value: "127.0.0.1:9090"},
{Name: "job", Value: "prometheus"},
},
{
{Name: "a", Value: "b"},
{Name: "instance", Value: "127.0.0.1:9090"},
{Name: "job", Value: "prom-k8s"},
},
{
{Name: "a", Value: "b"},
{Name: "instance", Value: "localhost:9090"},
{Name: "job", Value: "prom-k8s"},
},
{
{Name: "a", Value: "c"},
{Name: "instance", Value: "localhost:9090"},
{Name: "job", Value: "prometheus"},
},
{
{Name: "a", Value: "c"},
{Name: "instance", Value: "127.0.0.1:9090"},
{Name: "job", Value: "prometheus"},
},
{
{Name: "a", Value: "c"},
{Name: "instance", Value: "127.0.0.1:9090"},
{Name: "job", Value: "prom-k8s"},
},
{
{Name: "a", Value: "c"},
{Name: "instance", Value: "localhost:9090"},
{Name: "job", Value: "prom-k8s"},
},
}
seriesMap := map[string][]tsdbutil.Sample{}
for _, l := range lbls {
seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{}
}
dir, _ := ioutil.TempDir("", "test")
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
hb, err := NewHead(nil, nil, nil, 100000, DefaultStripeSize)
testutil.Ok(t, err)
defer hb.Close()
app := hb.Appender()
for _, l := range lbls {
ls := labels.New(l...)
series := []tsdbutil.Sample{}
ts := rand.Int63n(300)
for i := 0; i < numDatapoints; i++ {
v := rand.Float64()
_, err := app.Add(ls, ts, v)
testutil.Ok(t, err)
series = append(series, sample{ts, v})
ts += rand.Int63n(timeInterval) + 1
}
seriesMap[labels.New(l...).String()] = series
}
testutil.Ok(t, app.Commit())
// Delete a time-range from each-selector.
dels := []struct {
ms []*labels.Matcher
drange tombstones.Intervals
}{
{
ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "b")},
drange: tombstones.Intervals{{Mint: 300, Maxt: 500}, {Mint: 600, Maxt: 670}},
},
{
ms: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "a", "b"),
labels.MustNewMatcher(labels.MatchEqual, "job", "prom-k8s"),
},
drange: tombstones.Intervals{{Mint: 300, Maxt: 500}, {Mint: 100, Maxt: 670}},
},
{
ms: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "a", "c"),
labels.MustNewMatcher(labels.MatchEqual, "instance", "localhost:9090"),
labels.MustNewMatcher(labels.MatchEqual, "job", "prometheus"),
},
drange: tombstones.Intervals{{Mint: 300, Maxt: 400}, {Mint: 100, Maxt: 6700}},
},
// TODO: Add Regexp Matchers.
}
for _, del := range dels {
for _, r := range del.drange {
testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...))
}
matched := labels.Slice{}
for _, ls := range lbls {
s := labels.Selector(del.ms)
if s.Matches(ls) {
matched = append(matched, ls)
}
}
sort.Sort(matched)
for i := 0; i < numRanges; i++ {
q, err := NewBlockQuerier(hb, 0, 100000)
testutil.Ok(t, err)
defer q.Close()
ss, ws, err := q.Select(true, nil, del.ms...)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
// Build the mockSeriesSet.
matchedSeries := make([]storage.Series, 0, len(matched))
for _, m := range matched {
smpls := seriesMap[m.String()]
smpls = deletedSamples(smpls, del.drange)
// Only append those series for which samples exist as mockSeriesSet
// doesn't skip series with no samples.
// TODO: But sometimes SeriesSet returns an empty SeriesIterator
if len(smpls) > 0 {
matchedSeries = append(matchedSeries, newSeries(
m.Map(),
smpls,
))
}
}
expSs := newMockSeriesSet(matchedSeries)
// Compare both SeriesSets.
for {
eok, rok := expSs.Next(), ss.Next()
// Skip a series if iterator is empty.
if rok {
for !ss.At().Iterator().Next() {
rok = ss.Next()
if !rok {
break
}
}
}
testutil.Equals(t, eok, rok)
if !eok {
break
}
sexp := expSs.At()
sres := ss.At()
testutil.Equals(t, sexp.Labels(), sres.Labels())
smplExp, errExp := expandSeriesIterator(sexp.Iterator())
smplRes, errRes := expandSeriesIterator(sres.Iterator())
testutil.Equals(t, errExp, errRes)
testutil.Equals(t, smplExp, smplRes)
}
}
}
}
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 13:29:41 +00:00
func boundedSamples(full []tsdbutil.Sample, mint, maxt int64) []tsdbutil.Sample {
for len(full) > 0 {
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 13:29:41 +00:00
if full[0].T() >= mint {
break
}
full = full[1:]
}
for i, s := range full {
// labels.Labelinate on the first sample larger than maxt.
Vertical query merging and compaction (#370) * Vertical series iterator Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Select overlapped blocks first in compactor Plan() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Added vertical compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Code cleanup and comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix tests Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add benchmark for compaction Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Perform vertical compaction only when blocks are overlapping. Actions for vertical compaction: * Sorting chunk metas * Calling chunks.MergeOverlappingChunks on the chunks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for vertical compaction * BenchmarkNormalCompaction => BenchmarkCompaction * Moved the benchmark from db_test.go to compact_test.go Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Benchmark for query iterator and seek for non overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Vertical query merge only for overlapping blocks Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Simplify logging in Compact(...) Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG.md Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Calculate overlapping inside populateBlock Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * MinTime and MaxTime for BlockReader. Using this to find overlapping blocks in populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Sort blocks w.r.t. MinTime in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping in LeveledCompactor.write() instead of returning bool Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Log about overlapping inside LeveledCompactor.populateBlock() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor createBlock to take optional []Series Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * review1 Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com> * Updated CHANGELOG and minor nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * nits Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Updated CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor iterator and seek benchmarks for Querier. Also has as overlapping blocks. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Additional test case Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek. Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Split genSeries into genSeries and populateSeries Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Check error in benchmark Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Warn about overlapping blocks in reload() Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
2019-02-14 13:29:41 +00:00
if s.T() > maxt {
return full[:i]
}
}
// maxt is after highest sample.
return full
}
func deletedSamples(full []tsdbutil.Sample, dranges tombstones.Intervals) []tsdbutil.Sample {
ds := make([]tsdbutil.Sample, 0, len(full))
Outer:
for _, s := range full {
for _, r := range dranges {
if r.InBounds(s.T()) {
continue Outer
}
}
ds = append(ds, s)
}
return ds
}
func TestComputeChunkEndTime(t *testing.T) {
cases := []struct {
start, cur, max int64
res int64
}{
{
start: 0,
cur: 250,
max: 1000,
res: 1000,
},
{
start: 100,
cur: 200,
max: 1000,
res: 550,
},
// Case where we fit floored 0 chunks. Must catch division by 0
// and default to maximum time.
{
start: 0,
cur: 500,
max: 1000,
res: 1000,
},
2018-04-08 09:28:30 +00:00
// Catch division by zero for cur == start. Strictly not a possible case.
{
start: 100,
cur: 100,
max: 1000,
res: 104,
},
}
for _, c := range cases {
got := computeChunkEndTime(c.start, c.cur, c.max)
if got != c.res {
t.Errorf("expected %d for (start: %d, cur: %d, max: %d), got %d", c.res, c.start, c.cur, c.max, got)
}
}
}
func TestMemSeries_append(t *testing.T) {
s := newMemSeries(labels.Labels{}, 1, 500)
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
// New chunk must correctly be cut at 1000.
ok, chunkCreated := s.append(998, 1, 0)
testutil.Assert(t, ok, "append failed")
testutil.Assert(t, chunkCreated, "first sample created chunk")
ok, chunkCreated = s.append(999, 2, 0)
testutil.Assert(t, ok, "append failed")
testutil.Assert(t, !chunkCreated, "second sample should use same chunk")
ok, chunkCreated = s.append(1000, 3, 0)
testutil.Assert(t, ok, "append failed")
testutil.Assert(t, chunkCreated, "expected new chunk on boundary")
ok, chunkCreated = s.append(1001, 4, 0)
testutil.Assert(t, ok, "append failed")
testutil.Assert(t, !chunkCreated, "second sample should use same chunk")
testutil.Assert(t, s.chunks[0].minTime == 998 && s.chunks[0].maxTime == 999, "wrong chunk range")
testutil.Assert(t, s.chunks[1].minTime == 1000 && s.chunks[1].maxTime == 1001, "wrong chunk range")
// Fill the range [1000,2000) with many samples. Intermediate chunks should be cut
// at approximately 120 samples per chunk.
for i := 1; i < 1000; i++ {
ok, _ := s.append(1001+int64(i), float64(i), 0)
testutil.Assert(t, ok, "append failed")
}
testutil.Assert(t, len(s.chunks) > 7, "expected intermediate chunks")
// All chunks but the first and last should now be moderately full.
for i, c := range s.chunks[1 : len(s.chunks)-1] {
testutil.Assert(t, c.chunk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, c.chunk.NumSamples())
}
}
func TestGCChunkAccess(t *testing.T) {
// Put a chunk, select it. GC it and then access it.
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
h.initTime(0)
s, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
// Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, !chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, !chunkCreated, "chunks was created")
idx := h.indexRange(0, 1500)
var (
lset labels.Labels
chunks []chunks.Meta
)
testutil.Ok(t, idx.Series(1, &lset, &chunks))
testutil.Equals(t, labels.Labels{{
Name: "a", Value: "1",
}}, lset)
testutil.Equals(t, 2, len(chunks))
cr := h.chunksRange(0, 1500, nil)
_, err = cr.Chunk(chunks[0].Ref)
testutil.Ok(t, err)
_, err = cr.Chunk(chunks[1].Ref)
testutil.Ok(t, err)
testutil.Ok(t, h.Truncate(1500)) // Remove a chunk.
_, err = cr.Chunk(chunks[0].Ref)
testutil.Equals(t, storage.ErrNotFound, err)
_, err = cr.Chunk(chunks[1].Ref)
testutil.Ok(t, err)
}
func TestGCSeriesAccess(t *testing.T) {
// Put a series, select it. GC it and then access it.
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
h.initTime(0)
s, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
// Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, !chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, !chunkCreated, "chunks was created")
idx := h.indexRange(0, 2000)
var (
lset labels.Labels
chunks []chunks.Meta
)
testutil.Ok(t, idx.Series(1, &lset, &chunks))
testutil.Equals(t, labels.Labels{{
Name: "a", Value: "1",
}}, lset)
testutil.Equals(t, 2, len(chunks))
cr := h.chunksRange(0, 2000, nil)
_, err = cr.Chunk(chunks[0].Ref)
testutil.Ok(t, err)
_, err = cr.Chunk(chunks[1].Ref)
testutil.Ok(t, err)
testutil.Ok(t, h.Truncate(2000)) // Remove the series.
testutil.Equals(t, (*memSeries)(nil), h.series.getByID(1))
_, err = cr.Chunk(chunks[0].Ref)
testutil.Equals(t, storage.ErrNotFound, err)
_, err = cr.Chunk(chunks[1].Ref)
testutil.Equals(t, storage.ErrNotFound, err)
}
func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
h.initTime(0)
app := h.appender(0, 0)
lset := labels.FromStrings("a", "1")
_, err = app.Add(lset, 2100, 1)
testutil.Ok(t, err)
testutil.Ok(t, h.Truncate(2000))
testutil.Assert(t, nil != h.series.getByHash(lset.Hash(), lset), "series should not have been garbage collected")
testutil.Ok(t, app.Commit())
q, err := NewBlockQuerier(h, 1500, 2500)
testutil.Ok(t, err)
defer q.Close()
ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
testutil.Equals(t, true, ss.Next())
}
func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
h.initTime(0)
app := h.appender(0, 0)
lset := labels.FromStrings("a", "1")
_, err = app.Add(lset, 2100, 1)
testutil.Ok(t, err)
testutil.Ok(t, h.Truncate(2000))
testutil.Assert(t, nil != h.series.getByHash(lset.Hash(), lset), "series should not have been garbage collected")
testutil.Ok(t, app.Rollback())
q, err := NewBlockQuerier(h, 1500, 2500)
testutil.Ok(t, err)
defer q.Close()
ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
testutil.Equals(t, false, ss.Next())
// Truncate again, this time the series should be deleted
testutil.Ok(t, h.Truncate(2050))
testutil.Equals(t, (*memSeries)(nil), h.series.getByHash(lset.Hash(), lset))
}
func TestHead_LogRollback(t *testing.T) {
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_rollback")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, dir, compress)
testutil.Ok(t, err)
defer w.Close()
h, err := NewHead(nil, nil, w, 1000, DefaultStripeSize)
testutil.Ok(t, err)
app := h.Appender()
_, err = app.Add(labels.FromStrings("a", "b"), 1, 2)
testutil.Ok(t, err)
testutil.Ok(t, app.Rollback())
recs := readTestWAL(t, w.Dir())
testutil.Equals(t, 1, len(recs))
series, ok := recs[0].([]record.RefSeries)
testutil.Assert(t, ok, "expected series record but got %+v", recs[0])
testutil.Equals(t, []record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series)
})
}
}
// TestWalRepair_DecodingError ensures that a repair is run for an error
// when decoding a record.
func TestWalRepair_DecodingError(t *testing.T) {
var enc record.Encoder
for name, test := range map[string]struct {
corrFunc func(rec []byte) []byte // Func that applies the corruption to a record.
rec []byte
totalRecs int
expRecs int
}{
"invalid_record": {
func(rec []byte) []byte {
// Do not modify the base record because it is Logged multiple times.
res := make([]byte, len(rec))
copy(res, rec)
res[0] = byte(record.Invalid)
return res
},
enc.Series([]record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}),
9,
5,
},
"decode_series": {
func(rec []byte) []byte {
return rec[:3]
},
enc.Series([]record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}),
9,
5,
},
"decode_samples": {
func(rec []byte) []byte {
return rec[:3]
},
enc.Samples([]record.RefSample{{Ref: 0, T: 99, V: 1}}, []byte{}),
9,
5,
},
"decode_tombstone": {
func(rec []byte) []byte {
return rec[:3]
},
enc.Tombstones([]tombstones.Stone{{Ref: 1, Intervals: tombstones.Intervals{}}}, []byte{}),
9,
5,
},
} {
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_repair")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
// Fill the wal and corrupt it.
{
w, err := wal.New(nil, nil, filepath.Join(dir, "wal"), compress)
testutil.Ok(t, err)
for i := 1; i <= test.totalRecs; i++ {
// At this point insert a corrupted record.
if i-1 == test.expRecs {
testutil.Ok(t, w.Log(test.corrFunc(test.rec)))
continue
}
testutil.Ok(t, w.Log(test.rec))
}
h, err := NewHead(nil, nil, w, 1, DefaultStripeSize)
testutil.Ok(t, err)
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))
initErr := h.Init(math.MinInt64)
err = errors.Cause(initErr) // So that we can pick up errors even if wrapped.
_, corrErr := err.(*wal.CorruptionErr)
testutil.Assert(t, corrErr, "reading the wal didn't return corruption error")
testutil.Ok(t, w.Close())
}
// Open the db to trigger a repair.
{
db, err := Open(dir, nil, nil, DefaultOptions())
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, db.Close())
}()
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal))
}
// Read the wal content after the repair.
{
sr, err := wal.NewSegmentsReader(filepath.Join(dir, "wal"))
testutil.Ok(t, err)
defer sr.Close()
r := wal.NewReader(sr)
var actRec int
for r.Next() {
actRec++
}
testutil.Ok(t, r.Err())
testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records")
}
})
}
}
}
func TestNewWalSegmentOnTruncate(t *testing.T) {
dir, err := ioutil.TempDir("", "test_wal_segments")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
wlog, err := wal.NewSize(nil, nil, dir, 32768, false)
testutil.Ok(t, err)
h, err := NewHead(nil, nil, wlog, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
add := func(ts int64) {
app := h.Appender()
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, ts, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
}
add(0)
_, last, err := wlog.Segments()
testutil.Ok(t, err)
testutil.Equals(t, 0, last)
add(1)
testutil.Ok(t, h.Truncate(1))
_, last, err = wlog.Segments()
testutil.Ok(t, err)
testutil.Equals(t, 1, last)
add(2)
testutil.Ok(t, h.Truncate(2))
_, last, err = wlog.Segments()
testutil.Ok(t, err)
testutil.Equals(t, 2, last)
}
func TestAddDuplicateLabelName(t *testing.T) {
dir, err := ioutil.TempDir("", "test_duplicate_label_name")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
wlog, err := wal.NewSize(nil, nil, dir, 32768, false)
testutil.Ok(t, err)
h, err := NewHead(nil, nil, wlog, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
add := func(labels labels.Labels, labelName string) {
app := h.Appender()
_, err = app.Add(labels, 0, 0)
testutil.NotOk(t, err)
testutil.Equals(t, fmt.Sprintf(`label name "%s" is not unique: invalid sample`, labelName), err.Error())
}
add(labels.Labels{{Name: "a", Value: "c"}, {Name: "a", Value: "b"}}, "a")
add(labels.Labels{{Name: "a", Value: "c"}, {Name: "a", Value: "c"}}, "a")
add(labels.Labels{{Name: "__name__", Value: "up"}, {Name: "job", Value: "prometheus"}, {Name: "le", Value: "500"}, {Name: "le", Value: "400"}, {Name: "unit", Value: "s"}}, "le")
}
Make head Postings only return series in time range benchmark old ns/op new ns/op delta BenchmarkQuerierSelect/Head/1of1000000-8 405805161 120436132 -70.32% BenchmarkQuerierSelect/Head/10of1000000-8 403079620 120624292 -70.07% BenchmarkQuerierSelect/Head/100of1000000-8 404678647 120923522 -70.12% BenchmarkQuerierSelect/Head/1000of1000000-8 403145813 118636563 -70.57% BenchmarkQuerierSelect/Head/10000of1000000-8 405020046 125716206 -68.96% BenchmarkQuerierSelect/Head/100000of1000000-8 426305002 175808499 -58.76% BenchmarkQuerierSelect/Head/1000000of1000000-8 619002108 567013003 -8.40% BenchmarkQuerierSelect/SortedHead/1of1000000-8 1276316086 120281094 -90.58% BenchmarkQuerierSelect/SortedHead/10of1000000-8 1282631170 121836526 -90.50% BenchmarkQuerierSelect/SortedHead/100of1000000-8 1325824787 121174967 -90.86% BenchmarkQuerierSelect/SortedHead/1000of1000000-8 1271386268 121025117 -90.48% BenchmarkQuerierSelect/SortedHead/10000of1000000-8 1280223345 130838948 -89.78% BenchmarkQuerierSelect/SortedHead/100000of1000000-8 1271401620 243635515 -80.84% BenchmarkQuerierSelect/SortedHead/1000000of1000000-8 1360256090 1307744674 -3.86% BenchmarkQuerierSelect/Block/1of1000000-8 748183120 707888498 -5.39% BenchmarkQuerierSelect/Block/10of1000000-8 741084129 716317249 -3.34% BenchmarkQuerierSelect/Block/100of1000000-8 722157273 735624256 +1.86% BenchmarkQuerierSelect/Block/1000of1000000-8 727587744 731981838 +0.60% BenchmarkQuerierSelect/Block/10000of1000000-8 727518578 726860308 -0.09% BenchmarkQuerierSelect/Block/100000of1000000-8 765577046 757382386 -1.07% BenchmarkQuerierSelect/Block/1000000of1000000-8 1126722881 1084779083 -3.72% benchmark old allocs new allocs delta BenchmarkQuerierSelect/Head/1of1000000-8 4000018 24 -100.00% BenchmarkQuerierSelect/Head/10of1000000-8 4000036 82 -100.00% BenchmarkQuerierSelect/Head/100of1000000-8 4000216 625 -99.98% BenchmarkQuerierSelect/Head/1000of1000000-8 4002016 6028 -99.85% BenchmarkQuerierSelect/Head/10000of1000000-8 4020016 60037 -98.51% BenchmarkQuerierSelect/Head/100000of1000000-8 4200016 600047 -85.71% BenchmarkQuerierSelect/Head/1000000of1000000-8 6000016 6000016 +0.00% BenchmarkQuerierSelect/SortedHead/1of1000000-8 4000055 28 -100.00% BenchmarkQuerierSelect/SortedHead/10of1000000-8 4000073 87 -100.00% BenchmarkQuerierSelect/SortedHead/100of1000000-8 4000253 630 -99.98% BenchmarkQuerierSelect/SortedHead/1000of1000000-8 4002053 6036 -99.85% BenchmarkQuerierSelect/SortedHead/10000of1000000-8 4020053 60054 -98.51% BenchmarkQuerierSelect/SortedHead/100000of1000000-8 4200053 600074 -85.71% BenchmarkQuerierSelect/SortedHead/1000000of1000000-8 6000053 6000053 +0.00% BenchmarkQuerierSelect/Block/1of1000000-8 6000021 6000021 +0.00% BenchmarkQuerierSelect/Block/10of1000000-8 6000057 6000057 +0.00% BenchmarkQuerierSelect/Block/100of1000000-8 6000417 6000417 +0.00% BenchmarkQuerierSelect/Block/1000of1000000-8 6004017 6004017 +0.00% BenchmarkQuerierSelect/Block/10000of1000000-8 6040017 6040017 +0.00% BenchmarkQuerierSelect/Block/100000of1000000-8 6400017 6400017 +0.00% BenchmarkQuerierSelect/Block/1000000of1000000-8 10000018 10000018 +0.00% benchmark old bytes new bytes delta BenchmarkQuerierSelect/Head/1of1000000-8 176001177 1392 -100.00% BenchmarkQuerierSelect/Head/10of1000000-8 176002329 4368 -100.00% BenchmarkQuerierSelect/Head/100of1000000-8 176013849 33520 -99.98% BenchmarkQuerierSelect/Head/1000of1000000-8 176129056 321456 -99.82% BenchmarkQuerierSelect/Head/10000of1000000-8 177281049 3427376 -98.07% BenchmarkQuerierSelect/Head/100000of1000000-8 188801049 35055408 -81.43% BenchmarkQuerierSelect/Head/1000000of1000000-8 304001059 304001049 -0.00% BenchmarkQuerierSelect/SortedHead/1of1000000-8 229192188 2488 -100.00% BenchmarkQuerierSelect/SortedHead/10of1000000-8 229193340 5568 -100.00% BenchmarkQuerierSelect/SortedHead/100of1000000-8 229204860 35536 -99.98% BenchmarkQuerierSelect/SortedHead/1000of1000000-8 229320060 345104 -99.85% BenchmarkQuerierSelect/SortedHead/10000of1000000-8 230472060 3894672 -98.31% BenchmarkQuerierSelect/SortedHead/100000of1000000-8 241992060 40511632 -83.26% BenchmarkQuerierSelect/SortedHead/1000000of1000000-8 357192060 357192060 +0.00% BenchmarkQuerierSelect/Block/1of1000000-8 227201516 227201506 -0.00% BenchmarkQuerierSelect/Block/10of1000000-8 227203057 227203041 -0.00% BenchmarkQuerierSelect/Block/100of1000000-8 227217161 227217165 +0.00% BenchmarkQuerierSelect/Block/1000of1000000-8 227358279 227358289 +0.00% BenchmarkQuerierSelect/Block/10000of1000000-8 228769485 228769475 -0.00% BenchmarkQuerierSelect/Block/100000of1000000-8 242881487 242881477 -0.00% BenchmarkQuerierSelect/Block/1000000of1000000-8 384001705 384001705 +0.00% Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
2020-01-21 19:30:20 +00:00
func TestMemSeriesIsolation(t *testing.T) {
// Put a series, select it. GC it and then access it.
hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer hb.Close()
lastValue := func(maxAppendID uint64) int {
idx, err := hb.Index()
testutil.Ok(t, err)
iso := hb.iso.State()
iso.maxAppendID = maxAppendID
querier := &blockQuerier{
mint: 0,
maxt: 10000,
index: idx,
chunks: hb.chunksRange(math.MinInt64, math.MaxInt64, iso),
tombstones: tombstones.NewMemTombstones(),
}
testutil.Ok(t, err)
defer querier.Close()
ss, _, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err := expandSeriesSet(ss)
testutil.Ok(t, err)
for _, series := range seriesSet {
return int(series[len(series)-1].v)
}
return -1
}
i := 0
for ; i <= 1000; i++ {
var app storage.Appender
// To initialize bounds.
if hb.MinTime() == math.MaxInt64 {
app = &initAppender{head: hb, appendID: uint64(i), cleanupAppendIDsBelow: 0}
} else {
app = hb.appender(uint64(i), 0)
}
_, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
}
// Test simple cases in different chunks when no appendID cleanup has been performed.
testutil.Equals(t, 10, lastValue(10))
testutil.Equals(t, 130, lastValue(130))
testutil.Equals(t, 160, lastValue(160))
testutil.Equals(t, 240, lastValue(240))
testutil.Equals(t, 500, lastValue(500))
testutil.Equals(t, 750, lastValue(750))
testutil.Equals(t, 995, lastValue(995))
testutil.Equals(t, 999, lastValue(999))
// Cleanup appendIDs below 500.
app := hb.appender(uint64(i), 500)
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
i++
// We should not get queries with a maxAppendID below 500 after the cleanup,
// but they only take the remaining appendIDs into account.
testutil.Equals(t, 499, lastValue(10))
testutil.Equals(t, 499, lastValue(130))
testutil.Equals(t, 499, lastValue(160))
testutil.Equals(t, 499, lastValue(240))
testutil.Equals(t, 500, lastValue(500))
testutil.Equals(t, 995, lastValue(995))
testutil.Equals(t, 999, lastValue(999))
// Cleanup appendIDs below 1000, which means the sample buffer is
// the only thing with appendIDs.
app = hb.appender(uint64(i), 1000)
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Equals(t, 999, lastValue(998))
testutil.Equals(t, 999, lastValue(999))
testutil.Equals(t, 1000, lastValue(1000))
testutil.Equals(t, 1001, lastValue(1001))
testutil.Equals(t, 1002, lastValue(1002))
testutil.Equals(t, 1002, lastValue(1003))
i++
// Cleanup appendIDs below 1001, but with a rollback.
app = hb.appender(uint64(i), 1001)
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err)
testutil.Ok(t, app.Rollback())
testutil.Equals(t, 1000, lastValue(999))
testutil.Equals(t, 1000, lastValue(1000))
testutil.Equals(t, 1001, lastValue(1001))
testutil.Equals(t, 1002, lastValue(1002))
testutil.Equals(t, 1002, lastValue(1003))
}
func TestIsolationRollback(t *testing.T) {
// Rollback after a failed append and test if the low watermark has progressed anyway.
hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer hb.Close()
app := hb.Appender()
_, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Equals(t, uint64(1), hb.iso.lowWatermark())
app = hb.Appender()
_, err = app.Add(labels.FromStrings("foo", "bar"), 1, 1)
testutil.Ok(t, err)
_, err = app.Add(labels.FromStrings("foo", "bar", "foo", "baz"), 2, 2)
testutil.NotOk(t, err)
testutil.Ok(t, app.Rollback())
testutil.Equals(t, uint64(2), hb.iso.lowWatermark())
app = hb.Appender()
_, err = app.Add(labels.FromStrings("foo", "bar"), 3, 3)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Equals(t, uint64(3), hb.iso.lowWatermark(), "Low watermark should proceed to 3 even if append #2 was rolled back.")
}
func TestIsolationLowWatermarkMonotonous(t *testing.T) {
hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer hb.Close()
app1 := hb.Appender()
_, err = app1.Add(labels.FromStrings("foo", "bar"), 0, 0)
testutil.Ok(t, err)
testutil.Ok(t, app1.Commit())
testutil.Equals(t, uint64(1), hb.iso.lowWatermark(), "Low watermark should by 1 after 1st append.")
app1 = hb.Appender()
_, err = app1.Add(labels.FromStrings("foo", "bar"), 1, 1)
testutil.Ok(t, err)
testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should be two, even if append is not commited yet.")
app2 := hb.Appender()
_, err = app2.Add(labels.FromStrings("foo", "baz"), 1, 1)
testutil.Ok(t, err)
testutil.Ok(t, app2.Commit())
testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should stay two because app1 is not commited yet.")
is := hb.iso.State()
testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "After simulated read (iso state retrieved), low watermark should stay at 2.")
testutil.Ok(t, app1.Commit())
testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Even after app1 is commited, low watermark should stay at 2 because read is still ongoing.")
Make head Postings only return series in time range benchmark old ns/op new ns/op delta BenchmarkQuerierSelect/Head/1of1000000-8 405805161 120436132 -70.32% BenchmarkQuerierSelect/Head/10of1000000-8 403079620 120624292 -70.07% BenchmarkQuerierSelect/Head/100of1000000-8 404678647 120923522 -70.12% BenchmarkQuerierSelect/Head/1000of1000000-8 403145813 118636563 -70.57% BenchmarkQuerierSelect/Head/10000of1000000-8 405020046 125716206 -68.96% BenchmarkQuerierSelect/Head/100000of1000000-8 426305002 175808499 -58.76% BenchmarkQuerierSelect/Head/1000000of1000000-8 619002108 567013003 -8.40% BenchmarkQuerierSelect/SortedHead/1of1000000-8 1276316086 120281094 -90.58% BenchmarkQuerierSelect/SortedHead/10of1000000-8 1282631170 121836526 -90.50% BenchmarkQuerierSelect/SortedHead/100of1000000-8 1325824787 121174967 -90.86% BenchmarkQuerierSelect/SortedHead/1000of1000000-8 1271386268 121025117 -90.48% BenchmarkQuerierSelect/SortedHead/10000of1000000-8 1280223345 130838948 -89.78% BenchmarkQuerierSelect/SortedHead/100000of1000000-8 1271401620 243635515 -80.84% BenchmarkQuerierSelect/SortedHead/1000000of1000000-8 1360256090 1307744674 -3.86% BenchmarkQuerierSelect/Block/1of1000000-8 748183120 707888498 -5.39% BenchmarkQuerierSelect/Block/10of1000000-8 741084129 716317249 -3.34% BenchmarkQuerierSelect/Block/100of1000000-8 722157273 735624256 +1.86% BenchmarkQuerierSelect/Block/1000of1000000-8 727587744 731981838 +0.60% BenchmarkQuerierSelect/Block/10000of1000000-8 727518578 726860308 -0.09% BenchmarkQuerierSelect/Block/100000of1000000-8 765577046 757382386 -1.07% BenchmarkQuerierSelect/Block/1000000of1000000-8 1126722881 1084779083 -3.72% benchmark old allocs new allocs delta BenchmarkQuerierSelect/Head/1of1000000-8 4000018 24 -100.00% BenchmarkQuerierSelect/Head/10of1000000-8 4000036 82 -100.00% BenchmarkQuerierSelect/Head/100of1000000-8 4000216 625 -99.98% BenchmarkQuerierSelect/Head/1000of1000000-8 4002016 6028 -99.85% BenchmarkQuerierSelect/Head/10000of1000000-8 4020016 60037 -98.51% BenchmarkQuerierSelect/Head/100000of1000000-8 4200016 600047 -85.71% BenchmarkQuerierSelect/Head/1000000of1000000-8 6000016 6000016 +0.00% BenchmarkQuerierSelect/SortedHead/1of1000000-8 4000055 28 -100.00% BenchmarkQuerierSelect/SortedHead/10of1000000-8 4000073 87 -100.00% BenchmarkQuerierSelect/SortedHead/100of1000000-8 4000253 630 -99.98% BenchmarkQuerierSelect/SortedHead/1000of1000000-8 4002053 6036 -99.85% BenchmarkQuerierSelect/SortedHead/10000of1000000-8 4020053 60054 -98.51% BenchmarkQuerierSelect/SortedHead/100000of1000000-8 4200053 600074 -85.71% BenchmarkQuerierSelect/SortedHead/1000000of1000000-8 6000053 6000053 +0.00% BenchmarkQuerierSelect/Block/1of1000000-8 6000021 6000021 +0.00% BenchmarkQuerierSelect/Block/10of1000000-8 6000057 6000057 +0.00% BenchmarkQuerierSelect/Block/100of1000000-8 6000417 6000417 +0.00% BenchmarkQuerierSelect/Block/1000of1000000-8 6004017 6004017 +0.00% BenchmarkQuerierSelect/Block/10000of1000000-8 6040017 6040017 +0.00% BenchmarkQuerierSelect/Block/100000of1000000-8 6400017 6400017 +0.00% BenchmarkQuerierSelect/Block/1000000of1000000-8 10000018 10000018 +0.00% benchmark old bytes new bytes delta BenchmarkQuerierSelect/Head/1of1000000-8 176001177 1392 -100.00% BenchmarkQuerierSelect/Head/10of1000000-8 176002329 4368 -100.00% BenchmarkQuerierSelect/Head/100of1000000-8 176013849 33520 -99.98% BenchmarkQuerierSelect/Head/1000of1000000-8 176129056 321456 -99.82% BenchmarkQuerierSelect/Head/10000of1000000-8 177281049 3427376 -98.07% BenchmarkQuerierSelect/Head/100000of1000000-8 188801049 35055408 -81.43% BenchmarkQuerierSelect/Head/1000000of1000000-8 304001059 304001049 -0.00% BenchmarkQuerierSelect/SortedHead/1of1000000-8 229192188 2488 -100.00% BenchmarkQuerierSelect/SortedHead/10of1000000-8 229193340 5568 -100.00% BenchmarkQuerierSelect/SortedHead/100of1000000-8 229204860 35536 -99.98% BenchmarkQuerierSelect/SortedHead/1000of1000000-8 229320060 345104 -99.85% BenchmarkQuerierSelect/SortedHead/10000of1000000-8 230472060 3894672 -98.31% BenchmarkQuerierSelect/SortedHead/100000of1000000-8 241992060 40511632 -83.26% BenchmarkQuerierSelect/SortedHead/1000000of1000000-8 357192060 357192060 +0.00% BenchmarkQuerierSelect/Block/1of1000000-8 227201516 227201506 -0.00% BenchmarkQuerierSelect/Block/10of1000000-8 227203057 227203041 -0.00% BenchmarkQuerierSelect/Block/100of1000000-8 227217161 227217165 +0.00% BenchmarkQuerierSelect/Block/1000of1000000-8 227358279 227358289 +0.00% BenchmarkQuerierSelect/Block/10000of1000000-8 228769485 228769475 -0.00% BenchmarkQuerierSelect/Block/100000of1000000-8 242881487 242881477 -0.00% BenchmarkQuerierSelect/Block/1000000of1000000-8 384001705 384001705 +0.00% Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
2020-01-21 19:30:20 +00:00
is.Close()
testutil.Equals(t, uint64(3), hb.iso.lowWatermark(), "After read has finished (iso state closed), low watermark should jump to three.")
Make head Postings only return series in time range benchmark old ns/op new ns/op delta BenchmarkQuerierSelect/Head/1of1000000-8 405805161 120436132 -70.32% BenchmarkQuerierSelect/Head/10of1000000-8 403079620 120624292 -70.07% BenchmarkQuerierSelect/Head/100of1000000-8 404678647 120923522 -70.12% BenchmarkQuerierSelect/Head/1000of1000000-8 403145813 118636563 -70.57% BenchmarkQuerierSelect/Head/10000of1000000-8 405020046 125716206 -68.96% BenchmarkQuerierSelect/Head/100000of1000000-8 426305002 175808499 -58.76% BenchmarkQuerierSelect/Head/1000000of1000000-8 619002108 567013003 -8.40% BenchmarkQuerierSelect/SortedHead/1of1000000-8 1276316086 120281094 -90.58% BenchmarkQuerierSelect/SortedHead/10of1000000-8 1282631170 121836526 -90.50% BenchmarkQuerierSelect/SortedHead/100of1000000-8 1325824787 121174967 -90.86% BenchmarkQuerierSelect/SortedHead/1000of1000000-8 1271386268 121025117 -90.48% BenchmarkQuerierSelect/SortedHead/10000of1000000-8 1280223345 130838948 -89.78% BenchmarkQuerierSelect/SortedHead/100000of1000000-8 1271401620 243635515 -80.84% BenchmarkQuerierSelect/SortedHead/1000000of1000000-8 1360256090 1307744674 -3.86% BenchmarkQuerierSelect/Block/1of1000000-8 748183120 707888498 -5.39% BenchmarkQuerierSelect/Block/10of1000000-8 741084129 716317249 -3.34% BenchmarkQuerierSelect/Block/100of1000000-8 722157273 735624256 +1.86% BenchmarkQuerierSelect/Block/1000of1000000-8 727587744 731981838 +0.60% BenchmarkQuerierSelect/Block/10000of1000000-8 727518578 726860308 -0.09% BenchmarkQuerierSelect/Block/100000of1000000-8 765577046 757382386 -1.07% BenchmarkQuerierSelect/Block/1000000of1000000-8 1126722881 1084779083 -3.72% benchmark old allocs new allocs delta BenchmarkQuerierSelect/Head/1of1000000-8 4000018 24 -100.00% BenchmarkQuerierSelect/Head/10of1000000-8 4000036 82 -100.00% BenchmarkQuerierSelect/Head/100of1000000-8 4000216 625 -99.98% BenchmarkQuerierSelect/Head/1000of1000000-8 4002016 6028 -99.85% BenchmarkQuerierSelect/Head/10000of1000000-8 4020016 60037 -98.51% BenchmarkQuerierSelect/Head/100000of1000000-8 4200016 600047 -85.71% BenchmarkQuerierSelect/Head/1000000of1000000-8 6000016 6000016 +0.00% BenchmarkQuerierSelect/SortedHead/1of1000000-8 4000055 28 -100.00% BenchmarkQuerierSelect/SortedHead/10of1000000-8 4000073 87 -100.00% BenchmarkQuerierSelect/SortedHead/100of1000000-8 4000253 630 -99.98% BenchmarkQuerierSelect/SortedHead/1000of1000000-8 4002053 6036 -99.85% BenchmarkQuerierSelect/SortedHead/10000of1000000-8 4020053 60054 -98.51% BenchmarkQuerierSelect/SortedHead/100000of1000000-8 4200053 600074 -85.71% BenchmarkQuerierSelect/SortedHead/1000000of1000000-8 6000053 6000053 +0.00% BenchmarkQuerierSelect/Block/1of1000000-8 6000021 6000021 +0.00% BenchmarkQuerierSelect/Block/10of1000000-8 6000057 6000057 +0.00% BenchmarkQuerierSelect/Block/100of1000000-8 6000417 6000417 +0.00% BenchmarkQuerierSelect/Block/1000of1000000-8 6004017 6004017 +0.00% BenchmarkQuerierSelect/Block/10000of1000000-8 6040017 6040017 +0.00% BenchmarkQuerierSelect/Block/100000of1000000-8 6400017 6400017 +0.00% BenchmarkQuerierSelect/Block/1000000of1000000-8 10000018 10000018 +0.00% benchmark old bytes new bytes delta BenchmarkQuerierSelect/Head/1of1000000-8 176001177 1392 -100.00% BenchmarkQuerierSelect/Head/10of1000000-8 176002329 4368 -100.00% BenchmarkQuerierSelect/Head/100of1000000-8 176013849 33520 -99.98% BenchmarkQuerierSelect/Head/1000of1000000-8 176129056 321456 -99.82% BenchmarkQuerierSelect/Head/10000of1000000-8 177281049 3427376 -98.07% BenchmarkQuerierSelect/Head/100000of1000000-8 188801049 35055408 -81.43% BenchmarkQuerierSelect/Head/1000000of1000000-8 304001059 304001049 -0.00% BenchmarkQuerierSelect/SortedHead/1of1000000-8 229192188 2488 -100.00% BenchmarkQuerierSelect/SortedHead/10of1000000-8 229193340 5568 -100.00% BenchmarkQuerierSelect/SortedHead/100of1000000-8 229204860 35536 -99.98% BenchmarkQuerierSelect/SortedHead/1000of1000000-8 229320060 345104 -99.85% BenchmarkQuerierSelect/SortedHead/10000of1000000-8 230472060 3894672 -98.31% BenchmarkQuerierSelect/SortedHead/100000of1000000-8 241992060 40511632 -83.26% BenchmarkQuerierSelect/SortedHead/1000000of1000000-8 357192060 357192060 +0.00% BenchmarkQuerierSelect/Block/1of1000000-8 227201516 227201506 -0.00% BenchmarkQuerierSelect/Block/10of1000000-8 227203057 227203041 -0.00% BenchmarkQuerierSelect/Block/100of1000000-8 227217161 227217165 +0.00% BenchmarkQuerierSelect/Block/1000of1000000-8 227358279 227358289 +0.00% BenchmarkQuerierSelect/Block/10000of1000000-8 228769485 228769475 -0.00% BenchmarkQuerierSelect/Block/100000of1000000-8 242881487 242881477 -0.00% BenchmarkQuerierSelect/Block/1000000of1000000-8 384001705 384001705 +0.00% Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
2020-01-21 19:30:20 +00:00
}
func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
h.initTime(0)
s, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
ok, _ := s.append(0, 0, 0)
testutil.Assert(t, ok, "Series append failed.")
testutil.Equals(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.")
}
func TestHeadSeriesChunkRace(t *testing.T) {
for i := 0; i < 1000; i++ {
testHeadSeriesChunkRace(t)
}
}
func testHeadSeriesChunkRace(t *testing.T) {
h, err := NewHead(nil, nil, nil, 30, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
testutil.Ok(t, h.Init(0))
app := h.Appender()
s2, err := app.Add(labels.FromStrings("foo2", "bar"), 5, 0)
testutil.Ok(t, err)
for ts := int64(6); ts < 11; ts++ {
err = app.AddFast(s2, ts, 0)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
var wg sync.WaitGroup
matcher := labels.MustNewMatcher(labels.MatchEqual, "", "")
q, err := NewBlockQuerier(h, 18, 22)
testutil.Ok(t, err)
defer q.Close()
wg.Add(1)
go func() {
h.updateMinMaxTime(20, 25)
h.gc()
wg.Done()
}()
ss, _, err := q.Select(false, nil, matcher)
testutil.Ok(t, err)
testutil.Ok(t, ss.Err())
wg.Wait()
}