tsdb: Added ChunkQueryable implementations to db; unified MergeSeriesSets and vertical to single struct. (#7069)

* tsdb: Added ChunkQueryable implementations to db; unified compactor, querier and fanout block iterating.

Chained to https://github.com/prometheus/prometheus/pull/7059

* NewMerge(Chunk)Querier now takies multiple primaries allowing tsdb DB code to use it.
* Added single SeriesEntry / ChunkEntry for all series implementations.
* Unified all vertical, and non vertical for compact and querying to single
merge series / chunk sets by reusing VerticalSeriesMergeFunc for overlapping algorithm (same logic as before)
* Added block (Base/Chunk/)Querier for block querying. We then use populateAndTomb(Base/Chunk/) to iterate over chunks or samples.
* Refactored endpoint tests and querier tests to include subtests.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Addressed comments from Brian and Beorn.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fixed snapshot test and added chunk iterator support for DBReadOnly.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fixed race when iterating over Ats first.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fixed tests.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fixed populate block tests.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fixed endpoints test.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fixed test.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Added test & fixed case of head open chunk.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fixed DBReadOnly tests and bug producing 1 sample chunks.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Added cases for partial block overlap for multiple full chunks.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Added extra tests for chunk meta after compaction.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fixed small vertical merge bug and added more tests for that.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
pull/7705/head
Bartlomiej Plotka 4 years ago committed by GitHub
parent db7c5e204a
commit e6d7cc5fa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,17 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prompb
func (m Sample) T() int64 { return m.Timestamp }
func (m Sample) V() float64 { return m.Value }

@ -1062,7 +1062,6 @@ func TestSubquerySelector(t *testing.T) {
testutil.Ok(t, err)
res := qry.Exec(test.Context())
testutil.Equals(t, c.Result.Err, res.Err)
mat := res.Value.(Matrix)
sort.Sort(mat)

@ -25,11 +25,12 @@ func TestEvaluations(t *testing.T) {
testutil.Ok(t, err)
for _, fn := range files {
t.Run(fn, func(t *testing.T) {
test, err := newTestFromFile(t, fn)
testutil.Ok(t, err)
testutil.Ok(t, test.Run())
test.Close()
})
}
}

@ -96,7 +96,7 @@ func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error)
}
secondaries = append(secondaries, querier)
}
return NewMergeQuerier(primary, secondaries, ChainedSeriesMerge), nil
return NewMergeQuerier([]Querier{primary}, secondaries, ChainedSeriesMerge), nil
}
func (f *fanout) ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQuerier, error) {
@ -119,7 +119,7 @@ func (f *fanout) ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQueri
}
secondaries = append(secondaries, querier)
}
return NewMergeChunkQuerier(primary, secondaries, NewCompactingChunkSeriesMerger(ChainedSeriesMerge)), nil
return NewMergeChunkQuerier([]ChunkQuerier{primary}, secondaries, NewCompactingChunkSeriesMerger(ChainedSeriesMerge)), nil
}
func (f *fanout) Appender(ctx context.Context) Appender {
@ -214,38 +214,50 @@ type mergeGenericQuerier struct {
// mergeFn is used when we see series from different queriers Selects with the same labels.
mergeFn genericSeriesMergeFunc
// TODO(bwplotka): Remove once remote queries are asynchronous. False by default.
concurrentSelect bool
}
// NewMergeQuerier returns a new Querier that merges results of given primary and slice of secondary queriers.
// NewMergeQuerier returns a new Querier that merges results of given primary and secondary queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primary + secondaries Selects, merge function will be used.
func NewMergeQuerier(primary Querier, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier {
queriers := make([]genericQuerier, 0, len(secondaries)+1)
if primary != nil {
queriers = append(queriers, newGenericQuerierFrom(primary))
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
func NewMergeQuerier(primaries []Querier, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier {
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries {
if _, ok := q.(noopQuerier); !ok && q != nil {
queriers = append(queriers, newGenericQuerierFrom(q))
}
for _, querier := range secondaries {
if _, ok := querier.(noopQuerier); !ok && querier != nil {
queriers = append(queriers, newSecondaryQuerierFrom(querier))
}
for _, q := range secondaries {
if _, ok := q.(noopQuerier); !ok && q != nil {
queriers = append(queriers, newSecondaryQuerierFrom(q))
}
}
concurrentSelect := false
if len(secondaries) > 0 {
concurrentSelect = true
}
return &querierAdapter{&mergeGenericQuerier{
mergeFn: (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFn}).Merge,
queriers: queriers,
concurrentSelect: concurrentSelect,
}}
}
// NewMergeChunkQuerier returns a new ChunkQuerier that merges results of given primary and slice of secondary chunk queriers.
// NewMergeChunkQuerier returns a new Chunk Querier that merges results of given primary and secondary chunk queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primary + secondaries Selects, merge function will be used.
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
func NewMergeChunkQuerier(primary ChunkQuerier, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier {
queriers := make([]genericQuerier, 0, len(secondaries)+1)
if primary != nil {
queriers = append(queriers, newGenericQuerierFromChunk(primary))
func NewMergeChunkQuerier(primaries []ChunkQuerier, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier {
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries {
if _, ok := q.(noopChunkQuerier); !ok && q != nil {
queriers = append(queriers, newGenericQuerierFromChunk(q))
}
}
for _, querier := range secondaries {
if _, ok := querier.(noopChunkQuerier); !ok && querier != nil {
@ -253,24 +265,42 @@ func NewMergeChunkQuerier(primary ChunkQuerier, secondaries []ChunkQuerier, merg
}
}
concurrentSelect := false
if len(secondaries) > 0 {
concurrentSelect = true
}
return &chunkQuerierAdapter{&mergeGenericQuerier{
mergeFn: (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFn}).Merge,
queriers: queriers,
concurrentSelect: concurrentSelect,
}}
}
// Select returns a set of series that matches the given label matchers.
func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
if len(q.queriers) == 0 {
return noopGenericSeriesSet{}
}
if len(q.queriers) == 1 {
return q.queriers[0].Select(sortSeries, hints, matchers...)
}
var seriesSets = make([]genericSeriesSet, 0, len(q.queriers))
if !q.concurrentSelect {
for _, querier := range q.queriers {
// We need to sort for merge to work.
seriesSets = append(seriesSets, querier.Select(true, hints, matchers...))
}
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
return s, s.Next()
}}
}
var (
seriesSets = make([]genericSeriesSet, 0, len(q.queriers))
wg sync.WaitGroup
seriesSetChan = make(chan genericSeriesSet)
)
// Schedule all Selects for all queriers we know about.
for _, querier := range q.queriers {
wg.Add(1)
@ -289,99 +319,83 @@ func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matche
for r := range seriesSetChan {
seriesSets = append(seriesSets, r)
}
return &lazySeriesSet{create: create(seriesSets, q.mergeFn)}
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
return s, s.Next()
}}
}
func create(seriesSets []genericSeriesSet, mergeFunc genericSeriesMergeFunc) func() (genericSeriesSet, bool) {
// Returned function gets called with the first call to Next().
return func() (genericSeriesSet, bool) {
if len(seriesSets) == 1 {
return seriesSets[0], seriesSets[0].Next()
}
var h genericSeriesSetHeap
for _, set := range seriesSets {
if set == nil {
continue
}
if set.Next() {
heap.Push(&h, set)
continue
}
// When primary fails ignore results from secondaries.
// Only the primary querier returns error.
if err := set.Err(); err != nil {
return errorOnlySeriesSet{err}, false
type labelGenericQueriers []genericQuerier
func (l labelGenericQueriers) Len() int { return len(l) }
func (l labelGenericQueriers) Get(i int) LabelQuerier { return l[i] }
func (l labelGenericQueriers) SplitByHalf() (labelGenericQueriers, labelGenericQueriers) {
i := len(l) / 2
return l[:i], l[i:]
}
// LabelValues returns all potential values for a label name.
func (q *mergeGenericQuerier) LabelValues(name string) ([]string, Warnings, error) {
res, ws, err := q.lvals(q.queriers, name)
if err != nil {
return nil, nil, errors.Wrapf(err, "LabelValues() from merge generic querier for label %s", name)
}
set := &genericMergeSeriesSet{
mergeFunc: mergeFunc,
sets: seriesSets,
heap: h,
return res, ws, nil
}
return set, set.Next()
// lvals performs merge sort for LabelValues from multiple queriers.
func (q *mergeGenericQuerier) lvals(lq labelGenericQueriers, n string) ([]string, Warnings, error) {
if lq.Len() == 0 {
return nil, nil, nil
}
if lq.Len() == 1 {
return lq.Get(0).LabelValues(n)
}
a, b := lq.SplitByHalf()
// LabelValues returns all potential values for a label name.
func (q *mergeGenericQuerier) LabelValues(name string) ([]string, Warnings, error) {
var (
results [][]string
warnings Warnings
)
for _, querier := range q.queriers {
values, wrn, err := querier.LabelValues(name)
if wrn != nil {
// TODO(bwplotka): We could potentially wrap warnings.
warnings = append(warnings, wrn...)
}
var ws Warnings
s1, w, err := q.lvals(a, n)
ws = append(ws, w...)
if err != nil {
return nil, nil, errors.Wrapf(err, "LabelValues() from Querier for label %s", name)
return nil, ws, err
}
results = append(results, values)
s2, ws, err := q.lvals(b, n)
ws = append(ws, w...)
if err != nil {
return nil, ws, err
}
return mergeStringSlices(results), warnings, nil
return mergeStrings(s1, s2), ws, nil
}
func mergeStringSlices(ss [][]string) []string {
switch len(ss) {
case 0:
return nil
case 1:
return ss[0]
case 2:
return mergeTwoStringSlices(ss[0], ss[1])
default:
halfway := len(ss) / 2
return mergeTwoStringSlices(
mergeStringSlices(ss[:halfway]),
mergeStringSlices(ss[halfway:]),
)
}
func mergeStrings(a, b []string) []string {
maxl := len(a)
if len(b) > len(a) {
maxl = len(b)
}
res := make([]string, 0, maxl*10/9)
func mergeTwoStringSlices(a, b []string) []string {
i, j := 0, 0
result := make([]string, 0, len(a)+len(b))
for i < len(a) && j < len(b) {
switch strings.Compare(a[i], b[j]) {
case 0:
result = append(result, a[i])
i++
j++
case -1:
result = append(result, a[i])
i++
case 1:
result = append(result, b[j])
j++
for len(a) > 0 && len(b) > 0 {
d := strings.Compare(a[0], b[0])
if d == 0 {
res = append(res, a[0])
a, b = a[1:], b[1:]
} else if d < 0 {
res = append(res, a[0])
a = a[1:]
} else if d > 0 {
res = append(res, b[0])
b = b[1:]
}
}
result = append(result, a[i:]...)
result = append(result, b[j:]...)
return result
// Append all remaining elements.
res = append(res, a...)
res = append(res, b...)
return res
}
// LabelNames returns all the unique label names present in the block in sorted order.
// LabelNames returns all the unique label names present in all queriers in sorted order.
func (q *mergeGenericQuerier) LabelNames() ([]string, Warnings, error) {
var (
labelNamesMap = make(map[string]struct{})
@ -394,7 +408,7 @@ func (q *mergeGenericQuerier) LabelNames() ([]string, Warnings, error) {
warnings = append(warnings, wrn...)
}
if err != nil {
return nil, nil, errors.Wrap(err, "LabelNames() from Querier")
return nil, nil, errors.Wrap(err, "LabelNames() from merge generic querier")
}
for _, name := range names {
labelNamesMap[name] = struct{}{}
@ -412,7 +426,7 @@ func (q *mergeGenericQuerier) LabelNames() ([]string, Warnings, error) {
return labelNames, warnings, nil
}
// Close releases the resources of the Querier.
// Close releases the resources of the generic querier.
func (q *mergeGenericQuerier) Close() error {
errs := tsdb_errors.MultiError{}
for _, querier := range q.queriers {
@ -483,6 +497,9 @@ func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMe
if set.Next() {
heap.Push(&h, set)
}
if err := set.Err(); err != nil {
return errorOnlySeriesSet{err}
}
}
return &genericMergeSeriesSet{
mergeFunc: mergeFunc,
@ -583,33 +600,22 @@ func (h *genericSeriesSetHeap) Pop() interface{} {
// with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective
// this never happens.
//
// NOTE: Use this only when you see potentially overlapping series, as this introduces small overhead to handle overlaps
// between series.
func ChainedSeriesMerge(s ...Series) Series {
if len(s) == 0 {
// NOTE: Use this merge function only when you see potentially overlapping series, as this introduces a small overhead
// to handle overlaps between series.
func ChainedSeriesMerge(series ...Series) Series {
if len(series) == 0 {
return nil
}
return &chainSeries{
labels: s[0].Labels(),
series: s,
}
}
type chainSeries struct {
labels labels.Labels
series []Series
}
func (m *chainSeries) Labels() labels.Labels {
return m.labels
}
func (m *chainSeries) Iterator() chunkenc.Iterator {
iterators := make([]chunkenc.Iterator, 0, len(m.series))
for _, s := range m.series {
return &SeriesEntry{
Lset: series[0].Labels(),
SampleIteratorFn: func() chunkenc.Iterator {
iterators := make([]chunkenc.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator())
}
return newChainSampleIterator(iterators)
},
}
}
// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps
@ -710,47 +716,32 @@ func (h *samplesIteratorHeap) Pop() interface{} {
return x
}
type compactChunkSeriesMerger struct {
mergeFunc VerticalSeriesMergeFunc
labels labels.Labels
series []ChunkSeries
}
// NewCompactingChunkSeriesMerger returns VerticalChunkSeriesMergeFunc that merges the same chunk series into single chunk series.
// In case of the chunk overlaps, it compacts those into one or more time-ordered non-overlapping chunks with merged data.
// Samples from overlapped chunks are merged using series vertical merge func.
// It expects the same labels for each given series.
//
// NOTE: Use this only when you see potentially overlapping series, as this introduces small overhead to handle overlaps
// between series.
// NOTE: Use the returned merge function only when you see potentially overlapping series, as this introduces small a overhead
// to handle overlaps between series.
func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalChunkSeriesMergeFunc {
return func(s ...ChunkSeries) ChunkSeries {
if len(s) == 0 {
return func(series ...ChunkSeries) ChunkSeries {
if len(series) == 0 {
return nil
}
return &compactChunkSeriesMerger{
mergeFunc: mergeFunc,
labels: s[0].Labels(),
series: s,
}
}
}
func (s *compactChunkSeriesMerger) Labels() labels.Labels {
return s.labels
}
func (s *compactChunkSeriesMerger) Iterator() chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(s.series))
for _, series := range s.series {
iterators = append(iterators, series.Iterator())
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func() chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator())
}
return &compactChunkIterator{
mergeFunc: s.mergeFunc,
labels: s.labels,
mergeFunc: mergeFunc,
iterators: iterators,
}
},
}
}
}
// compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries.
@ -758,19 +749,16 @@ func (s *compactChunkSeriesMerger) Iterator() chunks.Iterator {
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
type compactChunkIterator struct {
mergeFunc VerticalSeriesMergeFunc
labels labels.Labels
iterators []chunks.Iterator
h chunkIteratorHeap
}
func (c *compactChunkIterator) At() chunks.Meta {
if len(c.h) == 0 {
panic("compactChunkIterator.At() called after .Next() returned false.")
err error
curr chunks.Meta
}
return c.h[0].At()
func (c *compactChunkIterator) At() chunks.Meta {
return c.curr
}
func (c *compactChunkIterator) Next() bool {
@ -780,59 +768,65 @@ func (c *compactChunkIterator) Next() bool {
heap.Push(&c.h, iter)
}
}
return len(c.h) > 0
}
if len(c.h) == 0 {
return false
}
// Detect overlaps to compact.
// Be smart about it and deduplicate on the fly if chunks are identical.
last := c.At()
var overlapped []Series
for {
iter := heap.Pop(&c.h).(chunks.Iterator)
c.curr = iter.At()
if iter.Next() {
heap.Push(&c.h, iter)
}
if len(c.h) == 0 {
break
}
// Get the current oldest chunk by min, then max time.
next := c.At()
if next.MinTime > last.MaxTime {
// No overlap with last one.
var (
overlapping []Series
oMaxTime = c.curr.MaxTime
prev = c.curr
)
// Detect overlaps to compact. Be smart about it and deduplicate on the fly if chunks are identical.
for len(c.h) > 0 {
// Get the next oldest chunk by min, then max time.
next := c.h[0].At()
if next.MinTime > oMaxTime {
// No overlap with current one.
break
}
if next.MinTime == last.MinTime &&
next.MaxTime == last.MaxTime &&
bytes.Equal(next.Chunk.Bytes(), last.Chunk.Bytes()) {
// 1:1 duplicates, skip last.
continue
if next.MinTime == prev.MinTime &&
next.MaxTime == prev.MaxTime &&
bytes.Equal(next.Chunk.Bytes(), prev.Chunk.Bytes()) {
// 1:1 duplicates, skip it.
} else {
// We operate on same series, so labels does not matter here.
overlapping = append(overlapping, newChunkToSeriesDecoder(nil, next))
if next.MaxTime > oMaxTime {
oMaxTime = next.MaxTime
}
overlapped = append(overlapped, &chunkToSeriesDecoder{
labels: c.labels,
Meta: last,
})
last = next
prev = next
}
if len(overlapped) == 0 {
return len(c.h) > 0
iter := heap.Pop(&c.h).(chunks.Iterator)
if iter.Next() {
heap.Push(&c.h, iter)
}
}
if len(overlapping) == 0 {
return true
}
// Add last, not yet included overlap.
overlapped = append(overlapped, &chunkToSeriesDecoder{
labels: c.labels,
Meta: c.At(),
})
var chkSeries ChunkSeries = &seriesToChunkEncoder{Series: c.mergeFunc(overlapped...)}
heap.Push(&c.h, chkSeries)
// Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here.
iter = (&seriesToChunkEncoder{Series: c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(nil, c.curr))...)}).Iterator()
if !iter.Next() {
if c.err = iter.Err(); c.err != nil {
return false
}
panic("unexpected seriesToChunkEncoder lack of iterations")
}
c.curr = iter.At()
if iter.Next() {
heap.Push(&c.h, iter)
}
return true
}
@ -843,6 +837,7 @@ func (c *compactChunkIterator) Err() error {
errs.Add(err)
}
}
errs.Add(c.err)
return errs.Err()
}

@ -28,35 +28,6 @@ import (
"github.com/prometheus/prometheus/util/testutil"
)
func TestMergeStringSlices(t *testing.T) {
for _, tc := range []struct {
input [][]string
expected []string
}{
{},
{[][]string{{"foo"}}, []string{"foo"}},
{[][]string{{"foo"}, {"bar"}}, []string{"bar", "foo"}},
{[][]string{{"foo"}, {"bar"}, {"baz"}}, []string{"bar", "baz", "foo"}},
} {
testutil.Equals(t, tc.expected, mergeStringSlices(tc.input))
}
}
func TestMergeTwoStringSlices(t *testing.T) {
for _, tc := range []struct {
a, b, expected []string
}{
{[]string{}, []string{}, []string{}},
{[]string{"foo"}, nil, []string{"foo"}},
{nil, []string{"bar"}, []string{"bar"}},
{[]string{"foo"}, []string{"bar"}, []string{"bar", "foo"}},
{[]string{"foo"}, []string{"bar", "baz"}, []string{"bar", "baz", "foo"}},
{[]string{"foo"}, []string{"foo"}, []string{"foo"}},
} {
testutil.Equals(t, tc.expected, mergeTwoStringSlices(tc.a, tc.b))
}
}
func TestMergeQuerierWithChainMerger(t *testing.T) {
for _, tc := range []struct {
name string
@ -215,7 +186,7 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
}
qs = append(qs, tc.extraQueriers...)
mergedQuerier := NewMergeQuerier(p, qs, ChainedSeriesMerge).Select(false, nil)
mergedQuerier := NewMergeQuerier([]Querier{p}, qs, ChainedSeriesMerge).Select(false, nil)
// Get all merged series upfront to make sure there are no incorrectly retained shared
// buffers causing bugs.
@ -230,8 +201,8 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
expectedSeries := tc.expected.At()
testutil.Equals(t, expectedSeries.Labels(), actualSeries.Labels())
expSmpl, expErr := ExpandSamples(expectedSeries.Iterator())
actSmpl, actErr := ExpandSamples(actualSeries.Iterator())
expSmpl, expErr := ExpandSamples(expectedSeries.Iterator(), nil)
actSmpl, actErr := ExpandSamples(actualSeries.Iterator(), nil)
testutil.Equals(t, expErr, actErr)
testutil.Equals(t, expSmpl, actSmpl)
}
@ -391,7 +362,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
}
qs = append(qs, tc.extraQueriers...)
merged := NewMergeChunkQuerier(p, qs, NewCompactingChunkSeriesMerger(nil)).Select(false, nil)
merged := NewMergeChunkQuerier([]ChunkQuerier{p}, qs, NewCompactingChunkSeriesMerger(nil)).Select(false, nil)
for merged.Next() {
testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true")
actualSeries := merged.At()
@ -410,6 +381,103 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
}
}
func TestCompactingChunkSeriesMerger(t *testing.T) {
m := NewCompactingChunkSeriesMerger(ChainedSeriesMerge)
for _, tc := range []struct {
name string
input []ChunkSeries
expected ChunkSeries
}{
{
name: "single empty series",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil),
},
{
name: "single series",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
},
{
name: "two empty series",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil),
},
{
name: "two non overlapping",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}, []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}),
},
{
name: "two overlapping",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{7, 7}, sample{8, 8}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}),
},
{
name: "two duplicated",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{5, 5}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
},
{
name: "three overlapping",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{6, 6}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}, sample{6, 6}}),
},
{
name: "three in chained overlap",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{4, 4}, sample{6, 6}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{6, 6}, sample{10, 10}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}, sample{6, 6}, sample{10, 10}}),
},
{
name: "three in chained overlap complex",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, []tsdbutil.Sample{sample{10, 10}, sample{15, 15}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{20, 20}}, []tsdbutil.Sample{sample{25, 25}, sample{30, 30}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{18, 18}, sample{26, 26}}, []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{0, 0}, sample{2, 2}, sample{5, 5}, sample{10, 10}, sample{15, 15}, sample{18, 18}, sample{20, 20}, sample{25, 25}, sample{26, 26}, sample{30, 30}},
[]tsdbutil.Sample{sample{31, 31}, sample{35, 35}},
),
},
} {
t.Run(tc.name, func(t *testing.T) {
merged := m(tc.input...)
testutil.Equals(t, tc.expected.Labels(), merged.Labels())
actChks, actErr := ExpandChunks(merged.Iterator())
expChks, expErr := ExpandChunks(tc.expected.Iterator())
testutil.Equals(t, expErr, actErr)
testutil.Equals(t, expChks, actChks)
})
}
}
type mockQuerier struct {
LabelQuerier
@ -543,7 +611,7 @@ func TestChainSampleIterator(t *testing.T) {
},
} {
merged := newChainSampleIterator(tc.input)
actual, err := ExpandSamples(merged)
actual, err := ExpandSamples(merged, nil)
testutil.Ok(t, err)
testutil.Equals(t, tc.expected, actual)
}
@ -586,7 +654,7 @@ func TestChainSampleIteratorSeek(t *testing.T) {
t, v := merged.At()
actual = append(actual, sample{t, v})
}
s, err := ExpandSamples(merged)
s, err := ExpandSamples(merged, nil)
testutil.Ok(t, err)
actual = append(actual, s...)
testutil.Equals(t, tc.expected, actual)
@ -620,7 +688,7 @@ func benchmarkDrain(seriesSet SeriesSet, b *testing.B) {
var err error
for n := 0; n < b.N; n++ {
for seriesSet.Next() {
result, err = ExpandSamples(seriesSet.At().Iterator())
result, err = ExpandSamples(seriesSet.At().Iterator(), nil)
testutil.Ok(b, err)
}
}

@ -16,7 +16,9 @@
package storage
import "github.com/prometheus/prometheus/pkg/labels"
import (
"github.com/prometheus/prometheus/pkg/labels"
)
type genericQuerier interface {
LabelQuerier

@ -20,7 +20,6 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/tombstones"
)
// The errors exposed.
@ -161,6 +160,7 @@ type Appender interface {
// SeriesSet contains a set of series.
type SeriesSet interface {
Next() bool
// At returns full series. Returned series should be iteratable even after Next is called.
At() Series
// The error that iteration as failed with.
// When an error occurs, set cannot continue to iterate.
@ -221,6 +221,7 @@ type Series interface {
// ChunkSeriesSet contains a set of chunked series.
type ChunkSeriesSet interface {
Next() bool
// At returns full chunk series. Returned series should be iteratable even after Next is called.
At() ChunkSeries
// The error that iteration has failed with.
// When an error occurs, set cannot continue to iterate.
@ -243,20 +244,14 @@ type Labels interface {
}
type SampleIteratable interface {
// Iterator returns a new iterator of the data of the series.
// Iterator returns a new, independent iterator of the data of the series.
Iterator() chunkenc.Iterator
}
type ChunkIteratable interface {
// ChunkIterator returns a new iterator that iterates over non-overlapping chunks of the series.
// Iterator returns a new, independent iterator that iterates over potentially overlapping
// chunks of the series, sorted by min time.
Iterator() chunks.Iterator
}
// TODO(bwplotka): Remove in next Pr.
type DeprecatedChunkSeriesSet interface {
Next() bool
At() (labels.Labels, []chunks.Meta, tombstones.Intervals)
Err() error
}
type Warnings []error

@ -13,37 +13,37 @@
package storage
type lazySeriesSet struct {
create func() (s genericSeriesSet, ok bool)
// lazyGenericSeriesSet is a wrapped series set that is initialised on first call to Next().
type lazyGenericSeriesSet struct {
init func() (genericSeriesSet, bool)
set genericSeriesSet
}
func (c *lazySeriesSet) Next() bool {
func (c *lazyGenericSeriesSet) Next() bool {
if c.set != nil {
return c.set.Next()
}
var ok bool
c.set, ok = c.create()
c.set, ok = c.init()
return ok
}
func (c *lazySeriesSet) Err() error {
func (c *lazyGenericSeriesSet) Err() error {
if c.set != nil {
return c.set.Err()
}
return nil
}
func (c *lazySeriesSet) At() Labels {
func (c *lazyGenericSeriesSet) At() Labels {
if c.set != nil {
return c.set.At()
}
return nil
}
func (c *lazySeriesSet) Warnings() Warnings {
func (c *lazyGenericSeriesSet) Warnings() Warnings {
if c.set != nil {
return c.set.Warnings()
}

@ -147,7 +147,7 @@ func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querie
}
queriers = append(queriers, q)
}
return storage.NewMergeQuerier(storage.NoopQuerier(), queriers, storage.ChainedSeriesMerge), nil
return storage.NewMergeQuerier(nil, queriers, storage.ChainedSeriesMerge), nil
}
// ChunkQuerier returns a storage.MergeQuerier combining the remote client queriers
@ -165,7 +165,7 @@ func (s *Storage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.C
}
queriers = append(queriers, q)
}
return storage.NewMergeChunkQuerier(storage.NoopChunkedQuerier(), queriers, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil
return storage.NewMergeChunkQuerier(nil, queriers, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil
}
// Appender implements storage.Storage.

@ -63,35 +63,37 @@ func (s *secondaryQuerier) LabelNames() ([]string, Warnings, error) {
return names, w, nil
}
func (s *secondaryQuerier) createFn(asyncSet genericSeriesSet) func() (genericSeriesSet, bool) {
s.asyncSets = append(s.asyncSets, asyncSet)
func (s *secondaryQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
if s.done {
panic("secondaryQuerier: Select invoked after first Next of any returned SeriesSet was done")
}
s.asyncSets = append(s.asyncSets, s.genericQuerier.Select(sortSeries, hints, matchers...))
curr := len(s.asyncSets) - 1
return func() (genericSeriesSet, bool) {
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s.once.Do(func() {
// At first create invocation we iterate over all sets and ensure its Next() returns some value without
// At first init invocation we iterate over all async sets and ensure its Next() returns some value without
// errors. This is to ensure we support consistent partial failures.
for i, set := range s.asyncSets {
if set.Next() {
continue
}
ws := set.Warnings()
// Failed set.
if err := set.Err(); err != nil {
ws = append([]error{err}, ws...)
// Promote the warnings to the current one.
s.asyncSets[curr] = warningsOnlySeriesSet(ws)
// One of the sets failed, ensure rest of the sets returns nothing. (All or nothing logic).
// One of the sets failed, ensure current one returning errors as warnings, and rest of the sets return nothing.
// (All or nothing logic).
s.asyncSets[curr] = warningsOnlySeriesSet(append([]error{err}, ws...))
for i := range s.asyncSets {
if curr != i {
s.asyncSets[i] = noopGenericSeriesSet{}
if curr == i {
continue
}
s.asyncSets[i] = noopGenericSeriesSet{}
}
break
}
// Exhausted set.
s.asyncSets[i] = warningsOnlySeriesSet(ws)
}
s.done = true
})
@ -101,12 +103,5 @@ func (s *secondaryQuerier) createFn(asyncSet genericSeriesSet) func() (genericSe
default:
return s.asyncSets[curr], true
}
}
}
func (s *secondaryQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
if s.done {
panic("secondaryQuerier: Select invoked after first Next of any returned SeriesSet was done")
}
return &lazySeriesSet{create: s.createFn(s.genericQuerier.Select(sortSeries, hints, matchers...))}
}}
}

@ -23,6 +23,47 @@ import (
"github.com/prometheus/prometheus/tsdb/tsdbutil"
)
type SeriesEntry struct {
Lset labels.Labels
SampleIteratorFn func() chunkenc.Iterator
}
func (s *SeriesEntry) Labels() labels.Labels { return s.Lset }
func (s *SeriesEntry) Iterator() chunkenc.Iterator { return s.SampleIteratorFn() }
type ChunkSeriesEntry struct {
Lset labels.Labels
ChunkIteratorFn func() chunks.Iterator
}
func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset }
func (s *ChunkSeriesEntry) Iterator() chunks.Iterator { return s.ChunkIteratorFn() }
// NewListSeries returns series entry with iterator that allows to iterate over provided samples.
func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry {
return &SeriesEntry{
Lset: lset,
SampleIteratorFn: func() chunkenc.Iterator {
return NewListSeriesIterator(samples(s))
},
}
}
// NewListChunkSeriesIterator returns chunk series entry that allows to iterate over provided samples.
// NOTE: It uses inefficient chunks encoding implementation, not caring about chunk size.
func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sample) *ChunkSeriesEntry {
return &ChunkSeriesEntry{
Lset: lset,
ChunkIteratorFn: func() chunks.Iterator {
chks := make([]chunks.Meta, 0, len(samples))
for _, s := range samples {
chks = append(chks, tsdbutil.ChunkFromSamples(s))
}
return NewListChunkSeriesIterator(chks...)
},
}
}
type listSeriesIterator struct {
samples Samples
idx int
@ -39,7 +80,7 @@ type Samples interface {
Len() int
}
// NewListSeriesIterator returns listSeriesIterator that allows to iterate over provided samples. Does not handle overlaps.
// NewListSeriesIterator returns listSeriesIterator that allows to iterate over provided samples.
func NewListSeriesIterator(samples Samples) chunkenc.Iterator {
return &listSeriesIterator{samples: samples, idx: -1}
}
@ -71,11 +112,10 @@ func (it *listSeriesIterator) Err() error { return nil }
type listChunkSeriesIterator struct {
chks []chunks.Meta
idx int
}
// NewListChunkSeriesIterator returns listChunkSeriesIterator that allows to iterate over provided chunks. Does not handle overlaps.
// NewListChunkSeriesIterator returns listChunkSeriesIterator that allows to iterate over provided chunks.
func NewListChunkSeriesIterator(chks ...chunks.Meta) chunks.Iterator {
return &listChunkSeriesIterator{chks: chks, idx: -1}
}
@ -112,17 +152,16 @@ func (c *chunkSetToSeriesSet) Next() bool {
c.sameSeriesChunks = c.sameSeriesChunks[:0]
for iter.Next() {
c.sameSeriesChunks = append(c.sameSeriesChunks, &chunkToSeriesDecoder{
labels: c.ChunkSeriesSet.At().Labels(),
Meta: iter.At(),
})
c.sameSeriesChunks = append(
c.sameSeriesChunks,
newChunkToSeriesDecoder(c.ChunkSeriesSet.At().Labels(), iter.At()),
)
}
if iter.Err() != nil {
c.chkIterErr = iter.Err()
return false
}
return true
}
@ -138,16 +177,15 @@ func (c *chunkSetToSeriesSet) Err() error {
return c.ChunkSeriesSet.Err()
}
type chunkToSeriesDecoder struct {
chunks.Meta
labels labels.Labels
}
func (s *chunkToSeriesDecoder) Labels() labels.Labels { return s.labels }
func newChunkToSeriesDecoder(labels labels.Labels, chk chunks.Meta) Series {
return &SeriesEntry{
Lset: labels,
SampleIteratorFn: func() chunkenc.Iterator {
// TODO(bwplotka): Can we provide any chunkenc buffer?
func (s *chunkToSeriesDecoder) Iterator() chunkenc.Iterator { return s.Chunk.Iterator(nil) }
return chk.Chunk.Iterator(nil)
},
}
}
type seriesSetToChunkSet struct {
SeriesSet
@ -217,3 +255,32 @@ type errChunksIterator struct {
func (e errChunksIterator) At() chunks.Meta { return chunks.Meta{} }
func (e errChunksIterator) Next() bool { return false }
func (e errChunksIterator) Err() error { return e.err }
// ExpandSamples iterates over all samples in the iterator, buffering all in slice.
// Optionally it takes samples constructor, useful when you want to compare sample slices with different
// sample implementations. if nil, sample type from this package will be used.
func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, v float64) tsdbutil.Sample) ([]tsdbutil.Sample, error) {
if newSampleFn == nil {
newSampleFn = func(t int64, v float64) tsdbutil.Sample { return sample{t, v} }
}
var result []tsdbutil.Sample
for iter.Next() {
t, v := iter.At()
// NaNs can't be compared normally, so substitute for another value.
if math.IsNaN(v) {
v = -42
}
result = append(result, newSampleFn(t, v))
}
return result, iter.Err()
}
// ExpandChunks iterates over all chunks in the iterator, buffering all in slice.
func ExpandChunks(iter chunks.Iterator) ([]chunks.Meta, error) {
var result []chunks.Meta
for iter.Next() {
result = append(result, iter.At())
}
return result, iter.Err()
}

@ -1,84 +0,0 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"math"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
)
type MockSeries struct {
labels labels.Labels
SampleIteratorFn func() chunkenc.Iterator
}
func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *MockSeries {
return &MockSeries{
labels: lset,
SampleIteratorFn: func() chunkenc.Iterator {
return NewListSeriesIterator(samples(s))
},
}
}
func (s *MockSeries) Labels() labels.Labels { return s.labels }
func (s *MockSeries) Iterator() chunkenc.Iterator { return s.SampleIteratorFn() }
type MockChunkSeries struct {
labels labels.Labels
ChunkIteratorFn func() chunks.Iterator
}
func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sample) *MockChunkSeries {
var chks []chunks.Meta
return &MockChunkSeries{
labels: lset,
ChunkIteratorFn: func() chunks.Iterator {
// Inefficient chunks encoding implementation, not caring about chunk size.
for _, s := range samples {
chks = append(chks, tsdbutil.ChunkFromSamples(s))
}
return NewListChunkSeriesIterator(chks...)
},
}
}
func (s *MockChunkSeries) Labels() labels.Labels { return s.labels }
func (s *MockChunkSeries) Iterator() chunks.Iterator { return s.ChunkIteratorFn() }
func ExpandSamples(iter chunkenc.Iterator) ([]tsdbutil.Sample, error) {
var result []tsdbutil.Sample
for iter.Next() {
t, v := iter.At()
// NaNs can't be compared normally, so substitute for another value.
if math.IsNaN(v) {
v = -42
}
result = append(result, sample{t, v})
}
return result, iter.Err()
}
func ExpandChunks(iter chunks.Iterator) ([]chunks.Meta, error) {
var result []chunks.Meta
for iter.Next() {
result = append(result, iter.At())
}
return result, iter.Err()
}

@ -44,10 +44,9 @@ type IndexWriter interface {
// AddSeries populates the index writer with a series and its offsets
// of chunks that the index can reference.
// Implementations may require series to be insert in increasing order by
// their labels.
// The reference numbers are used to resolve entries in postings lists that
// are added later.
// Implementations may require series to be insert in strictly increasing order by
// their labels. The reference numbers are used to resolve entries in postings lists
// that are added later.
AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error
// Close writes any finalization and closes the resources associated with

@ -16,7 +16,6 @@ package tsdb
import (
"context"
"encoding/binary"
"errors"
"hash/crc32"
"io/ioutil"
@ -32,6 +31,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wal"
"github.com/prometheus/prometheus/util/testutil"
)
@ -88,21 +88,22 @@ func TestCreateBlock(t *testing.T) {
}
func TestCorruptedChunk(t *testing.T) {
for name, test := range map[string]struct {
for _, tc := range []struct {
name string
corrFunc func(f *os.File) // Func that applies the corruption.
openErr error
queryErr error
iterErr error
}{
"invalid header size": {
func(f *os.File) {
err := f.Truncate(1)
testutil.Ok(t, err)
{
name: "invalid header size",
corrFunc: func(f *os.File) {
testutil.Ok(t, f.Truncate(1))
},
errors.New("invalid segment header in segment 0: invalid size"),
nil,
openErr: errors.New("invalid segment header in segment 0: invalid size"),
},
"invalid magic number": {
func(f *os.File) {
{
name: "invalid magic number",
corrFunc: func(f *os.File) {
magicChunksOffset := int64(0)
_, err := f.Seek(magicChunksOffset, 0)
testutil.Ok(t, err)
@ -114,11 +115,11 @@ func TestCorruptedChunk(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, chunks.MagicChunksSize, n)
},
errors.New("invalid magic number 0"),
nil,
openErr: errors.New("invalid magic number 0"),
},
"invalid chunk format version": {
func(f *os.File) {
{
name: "invalid chunk format version",
corrFunc: func(f *os.File) {
chunksFormatVersionOffset := int64(4)
_, err := f.Seek(chunksFormatVersionOffset, 0)
testutil.Ok(t, err)
@ -130,31 +131,28 @@ func TestCorruptedChunk(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, chunks.ChunksFormatVersionSize, n)
},
errors.New("invalid chunk format version 0"),
nil,
openErr: errors.New("invalid chunk format version 0"),
},
"chunk not enough bytes to read the chunk length": {
func(f *os.File) {
{
name: "chunk not enough bytes to read the chunk length",
corrFunc: func(f *os.File) {
// Truncate one byte after the segment header.
err := f.Truncate(chunks.SegmentHeaderSize + 1)
testutil.Ok(t, err)
testutil.Ok(t, f.Truncate(chunks.SegmentHeaderSize+1))
},
nil,
errors.New("segment doesn't include enough bytes to read the chunk size data field - required:13, available:9"),
iterErr: errors.New("cannot populate chunk 8: segment doesn't include enough bytes to read the chunk size data field - required:13, available:9"),
},
"chunk not enough bytes to read the data": {
func(f *os.File) {
{
name: "chunk not enough bytes to read the data",
corrFunc: func(f *os.File) {
fi, err := f.Stat()
testutil.Ok(t, err)
err = f.Truncate(fi.Size() - 1)
testutil.Ok(t, err)
testutil.Ok(t, f.Truncate(fi.Size()-1))
},
nil,
errors.New("segment doesn't include enough bytes to read the chunk - required:26, available:25"),
iterErr: errors.New("cannot populate chunk 8: segment doesn't include enough bytes to read the chunk - required:26, available:25"),
},
"checksum mismatch": {
func(f *os.File) {
{
name: "checksum mismatch",
corrFunc: func(f *os.File) {
fi, err := f.Stat()
testutil.Ok(t, err)
@ -168,18 +166,17 @@ func TestCorruptedChunk(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, n, 1)
},
nil,
errors.New("checksum mismatch expected:cfc0526c, actual:34815eae"),
iterErr: errors.New("cannot populate chunk 8: checksum mismatch expected:cfc0526c, actual:34815eae"),
},
} {
t.Run(name, func(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_open_block_chunk_corrupted")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
series := newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{sample{1, 1}})
series := storage.NewListSeries(labels.FromStrings("a", "b"), []tsdbutil.Sample{sample{1, 1}})
blockDir := createBlock(t, tmpdir, []storage.Series{series})
files, err := sequenceFiles(chunkDir(blockDir))
testutil.Ok(t, err)
@ -189,13 +186,13 @@ func TestCorruptedChunk(t *testing.T) {
testutil.Ok(t, err)
// Apply corruption function.
test.corrFunc(f)
tc.corrFunc(f)
testutil.Ok(t, f.Close())
// Check open err.
b, err := OpenBlock(nil, blockDir, nil)
if test.openErr != nil {
testutil.Equals(t, test.openErr.Error(), err.Error())
if tc.openErr != nil {
testutil.Equals(t, tc.openErr.Error(), err.Error())
return
}
defer func() { testutil.Ok(t, b.Close()) }()
@ -205,9 +202,11 @@ func TestCorruptedChunk(t *testing.T) {
defer func() { testutil.Ok(t, querier.Close()) }()
set := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
// Check query err.
testutil.Equals(t, false, set.Next())
testutil.Equals(t, test.queryErr.Error(), set.Err().Error())
// Check chunk errors during iter time.
testutil.Assert(t, set.Next(), "")
it := set.At().Iterator()
testutil.Equals(t, false, it.Next())
testutil.Equals(t, tc.iterErr.Error(), it.Err().Error())
})
}
}
@ -306,7 +305,7 @@ func createBlock(tb testing.TB, dir string, series []storage.Series) string {
chunkDir, err := ioutil.TempDir("", "chunk_dir")
testutil.Ok(tb, err)
defer func() { testutil.Ok(tb, os.RemoveAll(chunkDir)) }()
head := createHead(tb, series, chunkDir)
head := createHead(tb, nil, series, chunkDir)
defer func() { testutil.Ok(tb, head.Close()) }()
return createBlockFromHead(tb, dir, head)
}
@ -324,8 +323,8 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
return filepath.Join(dir, ulid.String())
}
func createHead(tb testing.TB, series []storage.Series, chunkDir string) *Head {
head, err := NewHead(nil, nil, nil, 2*60*60*1000, chunkDir, nil, DefaultStripeSize, nil)
func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir string) *Head {
head, err := NewHead(nil, nil, w, DefaultBlockDuration, chunkDir, nil, DefaultStripeSize, nil)
testutil.Ok(tb, err)
app := head.Appender(context.Background())
@ -345,8 +344,7 @@ func createHead(tb testing.TB, series []storage.Series, chunkDir string) *Head {
}
testutil.Ok(tb, it.Err())
}
err = app.Commit()
testutil.Ok(tb, err)
testutil.Ok(tb, app.Commit())
return head
}
@ -373,7 +371,7 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []storage.Series {
for t := mint; t < maxt; t++ {
samples = append(samples, sample{t: t, v: rand.Float64()})
}
series[i] = newSeries(lbls, samples)
series[i] = storage.NewListSeries(labels.FromMap(lbls), samples)
}
return series
}
@ -393,7 +391,7 @@ func populateSeries(lbls []map[string]string, mint, maxt int64) []storage.Series
for t := mint; t <= maxt; t++ {
samples = append(samples, sample{t: t, v: rand.Float64()})
}
series = append(series, newSeries(lbl, samples))
series = append(series, storage.NewListSeries(labels.FromMap(lbl), samples))
}
return series
}

@ -283,85 +283,6 @@ func (w *Writer) write(b []byte) error {
return err
}
// MergeOverlappingChunks removes the samples whose timestamp is overlapping.
// The last appearing sample is retained in case there is overlapping.
// This assumes that `chks []Meta` is sorted w.r.t. MinTime.
func MergeOverlappingChunks(chks []Meta) ([]Meta, error) {
if len(chks) < 2 {
return chks, nil
}
newChks := make([]Meta, 0, len(chks)) // Will contain the merged chunks.
newChks = append(newChks, chks[0])
last := 0
for _, c := range chks[1:] {
// We need to check only the last chunk in newChks.
// Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping)
// (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime.
// So never overlaps with newChks[last-1] or anything before that.
if c.MinTime > newChks[last].MaxTime {
newChks = append(newChks, c)
last++
continue
}
nc := &newChks[last]
if c.MaxTime > nc.MaxTime {
nc.MaxTime = c.MaxTime
}
chk, err := MergeChunks(nc.Chunk, c.Chunk)
if err != nil {
return nil, err
}
nc.Chunk = chk
}
return newChks, nil
}
// MergeChunks vertically merges a and b, i.e., if there is any sample
// with same timestamp in both a and b, the sample in a is discarded.
func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) {
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return nil, err
}
ait := a.Iterator(nil)
bit := b.Iterator(nil)
aok, bok := ait.Next(), bit.Next()
for aok && bok {
at, av := ait.At()
bt, bv := bit.At()
if at < bt {
app.Append(at, av)
aok = ait.Next()
} else if bt < at {
app.Append(bt, bv)
bok = bit.Next()
} else {
app.Append(bt, bv)
aok = ait.Next()
bok = bit.Next()
}
}
for aok {
at, av := ait.At()
app.Append(at, av)
aok = ait.Next()
}
for bok {
bt, bv := bit.At()
app.Append(bt, bv)
bok = bit.Next()
}
if ait.Err() != nil {
return nil, ait.Err()
}
if bit.Err() != nil {
return nil, bit.Err()
}
return newChunk, nil
}
// WriteChunks writes as many chunks as possible to the current segment,
// cuts a new segment when the current segment is full and
// writes the rest of the chunks in the new segment.

@ -575,7 +575,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
closers = append(closers, indexw)
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction")
return errors.Wrap(err, "populate block")
}
select {
@ -648,9 +648,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
var (
set storage.DeprecatedChunkSeriesSet
sets []storage.ChunkSeriesSet
symbols index.StringIter
closers = []io.Closer{}
closers []io.Closer
overlapping bool
)
defer func() {
@ -707,18 +707,13 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return err
}
all = indexr.SortedPostings(all)
s := newCompactionSeriesSet(indexr, chunkr, tombsr, all)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1))
syms := indexr.Symbols()
if i == 0 {
set = s
symbols = syms
continue
}
set, err = newCompactionMerger(set, s)
if err != nil {
return err
}
symbols = newMergedStringIter(symbols, syms)
}
@ -731,21 +726,34 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return errors.Wrap(symbols.Err(), "next symbol")
}
delIter := &deletedIterator{}
ref := uint64(0)
var (
ref = uint64(0)
chks []chunks.Meta
)
set := sets[0]
if len(sets) > 1 {
// Merge series using compacting chunk series merger.
set = storage.NewMergeChunkSeriesSet(sets, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge))
}
// Iterate over all sorted chunk series.
for set.Next() {
select {
case <-c.ctx.Done():
return c.ctx.Err()
default:
}
lset, chks, dranges := set.At() // The chunks here are not fully deleted.
if overlapping {
// If blocks are overlapping, it is possible to have unsorted chunks.
sort.Slice(chks, func(i, j int) bool {
return chks[i].MinTime < chks[j].MinTime
})
s := set.At()
chksIter := s.Iterator()
chks = chks[:0]
for chksIter.Next() {
// We are not iterating in streaming way over chunk as it's more efficient to do bulk write for index and
// chunk file purposes.
chks = append(chks, chksIter.At())
}
if chksIter.Err() != nil {
return errors.Wrap(chksIter.Err(), "chunk iter")
}
// Skip the series with all deleted chunks.
@ -753,85 +761,24 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
continue
}
for i, chk := range chks {
// Re-encode head chunks that are still open (being appended to) or
// outside the compacted MaxTime range.
// The chunk.Bytes() method is not safe for open chunks hence the re-encoding.
// This happens when snapshotting the head block.
//
// Block time range is half-open: [meta.MinTime, meta.MaxTime) and
// chunks are closed hence the chk.MaxTime >= meta.MaxTime check.
//
// TODO think how to avoid the typecasting to verify when it is head block.
if _, isHeadChunk := chk.Chunk.(*safeChunk); isHeadChunk && chk.MaxTime >= meta.MaxTime {
dranges = append(dranges, tombstones.Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64})
} else
// Sanity check for disk blocks.
// chk.MaxTime == meta.MaxTime shouldn't happen as well, but will brake many users so not checking for that.
if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime {
return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d",
chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime)
}
if len(dranges) > 0 {
// Re-encode the chunk to not have deleted values.
if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) {
continue
}
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return err
}
delIter.it = chk.Chunk.Iterator(delIter.it)
delIter.intervals = dranges
var (
t int64
v float64
)
for delIter.Next() {
t, v = delIter.At()
app.Append(t, v)
}
if err := delIter.Err(); err != nil {
return errors.Wrap(err, "iterate chunk while re-encoding")
}
chks[i].Chunk = newChunk
chks[i].MaxTime = t
}
}
mergedChks := chks
if overlapping {
mergedChks, err = chunks.MergeOverlappingChunks(chks)
if err != nil {
return errors.Wrap(err, "merge overlapping chunks")
}
}
if err := chunkw.WriteChunks(mergedChks...); err != nil {
if err := chunkw.WriteChunks(chks...); err != nil {
return errors.Wrap(err, "write chunks")
}
if err := indexw.AddSeries(ref, lset, mergedChks...); err != nil {
if err := indexw.AddSeries(ref, s.Labels(), chks...); err != nil {
return errors.Wrap(err, "add series")
}
meta.Stats.NumChunks += uint64(len(mergedChks))
meta.Stats.NumChunks += uint64(len(chks))
meta.Stats.NumSeries++
for _, chk := range mergedChks {
for _, chk := range chks {
meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
}
for _, chk := range mergedChks {
for _, chk := range chks {
if err := c.chunkPool.Put(chk.Chunk); err != nil {
return errors.Wrap(err, "put chunk")
}
}
ref++
}
if set.Err() != nil {
@ -841,166 +788,354 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return nil
}
type compactionSeriesSet struct {
// blockBaseSeriesSet allows to iterate over all series in the single block.
// Iterated series are trimmed with given min and max time as well as tombstones.
// See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating.
type blockBaseSeriesSet struct {
p index.Postings
index IndexReader
chunks ChunkReader
tombstones tombstones.Reader
mint, maxt int64
l labels.Labels
c []chunks.Meta
intervals tombstones.Intervals
currIterFn func() *populateWithDelGenericSeriesIterator
currLabels labels.Labels
bufChks []chunks.Meta
err error
}
func newCompactionSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings) *compactionSeriesSet {
return &compactionSeriesSet{
index: i,
chunks: c,
tombstones: t,
p: p,
func (b *blockBaseSeriesSet) Next() bool {
var lbls labels.Labels
for b.p.Next() {
if err := b.index.Series(b.p.At(), &lbls, &b.bufChks); err != nil {
// Postings may be stale. Skip if no underlying series exists.
if errors.Cause(err) == storage.ErrNotFound {
continue
}
b.err = errors.Wrapf(err, "get series %d", b.p.At())
return false
}
func (c *compactionSeriesSet) Next() bool {
if !c.p.Next() {
return false
if len(b.bufChks) == 0 {
continue
}
var err error
c.intervals, err = c.tombstones.Get(c.p.At())
intervals, err := b.tombstones.Get(b.p.At())
if err != nil {
c.err = errors.Wrap(err, "get tombstones")
b.err = errors.Wrap(err, "get tombstones")
return false
}
if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil {
c.err = errors.Wrapf(err, "get series %d", c.p.At())
return false
// NOTE:
// * block time range is half-open: [meta.MinTime, meta.MaxTime).
// * chunks are both closed: [chk.MinTime, chk.MaxTime].
// * requested time ranges are closed: [req.Start, req.End].
var trimFront, trimBack bool
// Copy chunks as iteratables are reusable.
chks := make([]chunks.Meta, 0, len(b.bufChks))
// Prefilter chunks and pick those which are not entirely deleted or totally outside of the requested range.
for _, chk := range b.bufChks {
if chk.MaxTime < b.mint {
continue
}
if chk.MinTime > b.maxt {
continue
}
// Remove completely deleted chunks.
if len(c.intervals) > 0 {
chks := make([]chunks.Meta, 0, len(c.c))
for _, chk := range c.c {
if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(c.intervals)) {
if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(intervals)) {
chks = append(chks, chk)
}
// If still not entirely deleted, check if trim is needed based on requested time range.
if chk.MinTime < b.mint {
trimFront = true
}
if chk.MaxTime > b.maxt {
trimBack = true
}
}
if len(chks) == 0 {
continue
}
if trimFront {
intervals = intervals.Add(tombstones.Interval{Mint: math.MinInt64, Maxt: b.mint - 1})
}
if trimBack {
intervals = intervals.Add(tombstones.Interval{Mint: b.maxt + 1, Maxt: math.MaxInt64})
}
b.currLabels = lbls
b.currIterFn = func() *populateWithDelGenericSeriesIterator {
return newPopulateWithDelGenericSeriesIterator(b.chunks, chks, intervals)
}
return true
}
return false
}
c.c = chks
func (b *blockBaseSeriesSet) Err() error {
if b.err != nil {
return b.err
}
return b.p.Err()
}
for i := range c.c {
chk := &c.c[i]
func (b *blockBaseSeriesSet) Warnings() storage.Warnings { return nil }
chk.Chunk, err = c.chunks.Chunk(chk.Ref)
if err != nil {
c.err = errors.Wrapf(err, "chunk %d not found", chk.Ref)
// populateWithDelGenericSeriesIterator allows to iterate over given chunk metas. In each iteration it ensures
// that chunks are trimmed based on given tombstones interval if any.
//
// populateWithDelGenericSeriesIterator assumes that chunks that would be fully removed by intervals are filtered out in previous phase.
//
// On each iteration currChkMeta is available. If currDelIter is not nil, it means that chunk iterator in currChkMeta
// is invalid and chunk rewrite is needed, currDelIter should be used.
type populateWithDelGenericSeriesIterator struct {
chunks ChunkReader
// chks are expected to be sorted by minTime and should be related to the same, single series.
chks []chunks.Meta
i int
err error
bufIter *deletedIterator
intervals tombstones.Intervals
currDelIter chunkenc.Iterator
currChkMeta chunks.Meta
}
func newPopulateWithDelGenericSeriesIterator(
chunks ChunkReader,
chks []chunks.Meta,
intervals tombstones.Intervals,
) *populateWithDelGenericSeriesIterator {
return &populateWithDelGenericSeriesIterator{
chunks: chunks,
chks: chks,
i: -1,
bufIter: &deletedIterator{},
intervals: intervals,
}
}
func (p *populateWithDelGenericSeriesIterator) next() bool {
if p.err != nil || p.i >= len(p.chks)-1 {
return false
}
p.i++
p.currChkMeta = p.chks[p.i]
p.currChkMeta.Chunk, p.err = p.chunks.Chunk(p.currChkMeta.Ref)
if p.err != nil {
p.err = errors.Wrapf(p.err, "cannot populate chunk %d", p.currChkMeta.Ref)
return false
}
return true
p.bufIter.intervals = p.bufIter.intervals[:0]
for _, interval := range p.intervals {
if p.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) {
p.bufIter.intervals = p.bufIter.intervals.Add(interval)
}
}
func (c *compactionSeriesSet) Err() error {
if c.err != nil {
return c.err
// Re-encode head chunks that are still open (being appended to) or
// outside the compacted MaxTime range.
// The chunk.Bytes() method is not safe for open chunks hence the re-encoding.
// This happens when snapshotting the head block or just fetching chunks from TSDB.
//
// TODO think how to avoid the typecasting to verify when it is head block.
_, isSafeChunk := p.currChkMeta.Chunk.(*safeChunk)
if len(p.bufIter.intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) {
// If there are no overlap with deletion intervals AND it's NOT an "open" head chunk, we can take chunk as it is.
p.currDelIter = nil
return true
}
return c.p.Err()
// We don't want full chunk or it's potentially still opened, take just part of it.
p.bufIter.it = p.currChkMeta.Chunk.Iterator(nil)
p.currDelIter = p.bufIter
return true
}
func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
return c.l, c.c, c.intervals
func (p *populateWithDelGenericSeriesIterator) Err() error { return p.err }
func (p *populateWithDelGenericSeriesIterator) toSeriesIterator() chunkenc.Iterator {
return &populateWithDelSeriesIterator{populateWithDelGenericSeriesIterator: p}
}
func (p *populateWithDelGenericSeriesIterator) toChunkSeriesIterator() chunks.Iterator {
return &populateWithDelChunkSeriesIterator{populateWithDelGenericSeriesIterator: p}
}
type compactionMerger struct {
a, b storage.DeprecatedChunkSeriesSet
// populateWithDelSeriesIterator allows to iterate over samples for the single series.
type populateWithDelSeriesIterator struct {
*populateWithDelGenericSeriesIterator
aok, bok bool
l labels.Labels
c []chunks.Meta
intervals tombstones.Intervals
curr chunkenc.Iterator
}
// TODO(bwplotka): Move to storage mergers.
func newCompactionMerger(a, b storage.DeprecatedChunkSeriesSet) (*compactionMerger, error) {
c := &compactionMerger{
a: a,
b: b,
func (p *populateWithDelSeriesIterator) Next() bool {
if p.curr != nil && p.curr.Next() {
return true
}
for p.next() {
if p.currDelIter != nil {
p.curr = p.currDelIter
} else {
p.curr = p.currChkMeta.Chunk.Iterator(nil)
}
if p.curr.Next() {
return true
}
}
return false
}
// Initialize first elements of both sets as Next() needs
// one element look-ahead.
c.aok = c.a.Next()
c.bok = c.b.Next()
return c, c.Err()
func (p *populateWithDelSeriesIterator) Seek(t int64) bool {
if p.curr != nil && p.curr.Seek(t) {
return true
}
for p.Next() {
if p.curr.Seek(t) {
return true
}
}
return false
}
func (c *compactionMerger) compare() int {
if !c.aok {
return 1
func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.curr.At() }
func (p *populateWithDelSeriesIterator) Err() error {
if err := p.populateWithDelGenericSeriesIterator.Err(); err != nil {
return err
}
if p.curr != nil {
return p.curr.Err()
}
if !c.bok {
return -1
return nil
}
a, _, _ := c.a.At()
b, _, _ := c.b.At()
return labels.Compare(a, b)
type populateWithDelChunkSeriesIterator struct {
*populateWithDelGenericSeriesIterator
curr chunks.Meta
}
func (c *compactionMerger) Next() bool {
if !c.aok && !c.bok || c.Err() != nil {
func (p *populateWithDelChunkSeriesIterator) Next() bool {
if !p.next() {
return false
}
// While advancing child iterators the memory used for labels and chunks
// may be reused. When picking a series we have to store the result.
var lset labels.Labels
var chks []chunks.Meta
d := c.compare()
if d > 0 {
lset, chks, c.intervals = c.b.At()
c.l = append(c.l[:0], lset...)
c.c = append(c.c[:0], chks...)
p.curr = p.currChkMeta
if p.currDelIter == nil {
return true
}
c.bok = c.b.Next()
} else if d < 0 {
lset, chks, c.intervals = c.a.At()
c.l = append(c.l[:0], lset...)
c.c = append(c.c[:0], chks...)
// Re-encode the chunk if iterator is provider. This means that it has some samples to be deleted or chunk is opened.
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
p.err = err
return false
}
c.aok = c.a.Next()
} else {
// Both sets contain the current series. Chain them into a single one.
l, ca, ra := c.a.At()
_, cb, rb := c.b.At()
if !p.currDelIter.Next() {
if err := p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
return false
}
for _, r := range rb {
ra = ra.Add(r)
// Empty chunk, this should not happen, as we assume full deletions being filtered before this iterator.
p.err = errors.Wrap(err, "populateWithDelChunkSeriesIterator: unexpected empty chunk found while rewriting chunk")
return false
}
c.l = append(c.l[:0], l...)
c.c = append(append(c.c[:0], ca...), cb...)
c.intervals = ra
t, v := p.currDelIter.At()
p.curr.MinTime = t
app.Append(t, v)
c.aok = c.a.Next()
c.bok = c.b.Next()
for p.currDelIter.Next() {
t, v = p.currDelIter.At()
app.Append(t, v)
}
if err := p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
return false
}
p.curr.Chunk = newChunk
p.curr.MaxTime = t
return true
}
func (c *compactionMerger) Err() error {
if c.a.Err() != nil {
return c.a.Err()
func (p *populateWithDelChunkSeriesIterator) At() chunks.Meta { return p.curr }
// blockSeriesSet allows to iterate over sorted, populated series with applied tombstones.
// Series with all deleted chunks are still present as Series with no samples.
// Samples from chunks are also trimmed to requested min and max time.
type blockSeriesSet struct {
blockBaseSeriesSet
}
func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.SeriesSet {
return &blockSeriesSet{
blockBaseSeriesSet{
index: i,
chunks: c,
tombstones: t,
p: p,
mint: mint,
maxt: maxt,
},
}
}
func (b *blockSeriesSet) At() storage.Series {
// At can be looped over before iterating, so save the current value locally.
currIterFn := b.currIterFn
return &storage.SeriesEntry{
Lset: b.currLabels,
SampleIteratorFn: func() chunkenc.Iterator {
return currIterFn().toSeriesIterator()
},
}
return c.b.Err()
}
func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
return c.l, c.c, c.intervals
// blockChunkSeriesSet allows to iterate over sorted, populated series with applied tombstones.
// Series with all deleted chunks are still present as Labelled iterator with no chunks.
// Chunks are also trimmed to requested [min and max] (keeping samples with min and max timestamps).
type blockChunkSeriesSet struct {
blockBaseSeriesSet
}
func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.ChunkSeriesSet {
return &blockChunkSeriesSet{
blockBaseSeriesSet{
index: i,
chunks: c,
tombstones: t,
p: p,
mint: mint,
maxt: maxt,
},
}
}
func (b *blockChunkSeriesSet) At() storage.ChunkSeries {
// At can be looped over before iterating, so save the current value locally.
currIterFn := b.currIterFn
return &storage.ChunkSeriesEntry{
Lset: b.currLabels,
ChunkIteratorFn: func() chunks.Iterator {
return currIterFn().toChunkSeriesIterator()
},
}
}
func newMergedStringIter(a index.StringIter, b index.StringIter) index.StringIter {

@ -28,8 +28,10 @@ import (
"github.com/pkg/errors"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/util/testutil"
)
@ -466,8 +468,23 @@ type nopChunkWriter struct{}
func (nopChunkWriter) WriteChunks(chunks ...chunks.Meta) error { return nil }
func (nopChunkWriter) Close() error { return nil }
func samplesForRange(minTime, maxTime int64, maxSamplesPerChunk int) (ret [][]sample) {
var curr []sample
for i := minTime; i <= maxTime; i++ {
curr = append(curr, sample{t: i})
if len(curr) >= maxSamplesPerChunk {
ret = append(ret, curr)
curr = []sample{}
}
}
if len(curr) > 0 {
ret = append(ret, curr)
}
return ret
}
func TestCompaction_populateBlock(t *testing.T) {
var populateBlocksCases = []struct {
for _, tc := range []struct {
title string
inputSeriesSamples [][]seriesSamples
compactMinTime int64
@ -483,11 +500,7 @@ func TestCompaction_populateBlock(t *testing.T) {
{
// Populate from single block without chunks. We expect these kind of series being ignored.
inputSeriesSamples: [][]seriesSamples{
{
{
lset: map[string]string{"a": "b"},
},
},
{{lset: map[string]string{"a": "b"}}},
},
},
{
@ -546,6 +559,46 @@ func TestCompaction_populateBlock(t *testing.T) {
},
},
},
{
title: "Populate from two blocks; chunks with negative time.",
inputSeriesSamples: [][]seriesSamples{
{
{
lset: map[string]string{"a": "b"},
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
},
{
lset: map[string]string{"a": "c"},
chunks: [][]sample{{{t: -11}, {t: -9}}, {{t: 10}, {t: 19}}},
},
{
// no-chunk series should be dropped.
lset: map[string]string{"a": "empty"},
},
},
{
{
lset: map[string]string{"a": "b"},
chunks: [][]sample{{{t: 21}, {t: 30}}},
},
{
lset: map[string]string{"a": "c"},
chunks: [][]sample{{{t: 40}, {t: 45}}},
},
},
},
compactMinTime: -11,
expSeriesSamples: []seriesSamples{
{
lset: map[string]string{"a": "b"},
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}, {{t: 21}, {t: 30}}},
},
{
lset: map[string]string{"a": "c"},
chunks: [][]sample{{{t: -11}, {t: -9}}, {{t: 10}, {t: 19}}, {{t: 40}, {t: 45}}},
},
},
},
{
title: "Populate from two blocks showing that order is maintained.",
inputSeriesSamples: [][]seriesSamples{
@ -636,8 +689,45 @@ func TestCompaction_populateBlock(t *testing.T) {
},
},
},
{
title: "Populate from two blocks 1:1 duplicated chunks; with negative timestamps.",
inputSeriesSamples: [][]seriesSamples{
{
{
lset: map[string]string{"a": "1"},
chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 3}, {t: 4}}},
},
{
lset: map[string]string{"a": "2"},
chunks: [][]sample{{{t: -3}, {t: -2}}, {{t: 1}, {t: 3}, {t: 4}}, {{t: 5}, {t: 6}}},
},
},
{
{
lset: map[string]string{"a": "1"},
chunks: [][]sample{{{t: 3}, {t: 4}}},
},
{
lset: map[string]string{"a": "2"},
chunks: [][]sample{{{t: 1}, {t: 3}, {t: 4}}, {{t: 7}, {t: 8}}},
},
},
},
compactMinTime: -3,
expSeriesSamples: []seriesSamples{
{
lset: map[string]string{"a": "1"},
chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 3}, {t: 4}}},
},
{
lset: map[string]string{"a": "2"},
chunks: [][]sample{{{t: -3}, {t: -2}}, {{t: 1}, {t: 3}, {t: 4}}, {{t: 5}, {t: 6}}, {{t: 7}, {t: 8}}},
},
},
},
{
// This should not happened because head block is making sure the chunks are not crossing block boundaries.
// We used to return error, but now chunk is trimmed.
title: "Populate from single block containing chunk outside of compact meta time range.",
inputSeriesSamples: [][]seriesSamples{
{
@ -649,10 +739,15 @@ func TestCompaction_populateBlock(t *testing.T) {
},
compactMinTime: 0,
compactMaxTime: 20,
expErr: errors.New("found chunk with minTime: 10 maxTime: 30 outside of compacted minTime: 0 maxTime: 20"),
expSeriesSamples: []seriesSamples{
{
lset: map[string]string{"a": "b"},
chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}}},
},
},
},
{
// Introduced by https://github.com/prometheus/tsdb/issues/347.
// Introduced by https://github.com/prometheus/tsdb/issues/347. We used to return error, but now chunk is trimmed.
title: "Populate from single block containing extra chunk",
inputSeriesSamples: [][]seriesSamples{
{
@ -664,7 +759,12 @@ func TestCompaction_populateBlock(t *testing.T) {
},
compactMinTime: 0,
compactMaxTime: 10,
expErr: errors.New("found chunk with minTime: 10 maxTime: 20 outside of compacted minTime: 0 maxTime: 10"),
expSeriesSamples: []seriesSamples{
{
lset: map[string]string{"a": "issue347"},
chunks: [][]sample{{{t: 1}, {t: 2}}},
},
},
},
{
// Deduplication expected.
@ -693,54 +793,146 @@ func TestCompaction_populateBlock(t *testing.T) {
},
{
// Introduced by https://github.com/prometheus/tsdb/pull/539.
title: "Populate from three blocks that the last two are overlapping.",
title: "Populate from three overlapping blocks.",
inputSeriesSamples: [][]seriesSamples{
{
{
lset: map[string]string{"before": "fix"},
chunks: [][]sample{{{t: 0}, {t: 10}, {t: 11}, {t: 20}}},
lset: map[string]string{"a": "overlap-all"},
chunks: [][]sample{{{t: 19}, {t: 30}}},
},
{
lset: map[string]string{"after": "fix"},
chunks: [][]sample{{{t: 0}, {t: 10}, {t: 11}, {t: 20}}},
lset: map[string]string{"a": "overlap-beginning"},
chunks: [][]sample{{{t: 0}, {t: 5}}},
},
{
lset: map[string]string{"a": "overlap-ending"},
chunks: [][]sample{{{t: 21}, {t: 30}}},
},
},
{
{
lset: map[string]string{"before": "fix"},
chunks: [][]sample{{{t: 19}, {t: 30}}},
lset: map[string]string{"a": "overlap-all"},
chunks: [][]sample{{{t: 0}, {t: 10}, {t: 11}, {t: 20}}},
},
{
lset: map[string]string{"after": "fix"},
chunks: [][]sample{{{t: 21}, {t: 30}}},
lset: map[string]string{"a": "overlap-beginning"},
chunks: [][]sample{{{t: 0}, {t: 10}, {t: 12}, {t: 20}}},
},
{
lset: map[string]string{"a": "overlap-ending"},
chunks: [][]sample{{{t: 0}, {t: 10}, {t: 13}, {t: 20}}},
},
},
{
{
lset: map[string]string{"before": "fix"},
lset: map[string]string{"a": "overlap-all"},
chunks: [][]sample{{{t: 27}, {t: 35}}},
},
{
lset: map[string]string{"after": "fix"},
lset: map[string]string{"a": "overlap-ending"},
chunks: [][]sample{{{t: 27}, {t: 35}}},
},
},
},
expSeriesSamples: []seriesSamples{
{
lset: map[string]string{"after": "fix"},
chunks: [][]sample{{{t: 0}, {t: 10}, {t: 11}, {t: 20}}, {{t: 21}, {t: 27}, {t: 30}, {t: 35}}},
lset: map[string]string{"a": "overlap-all"},
chunks: [][]sample{{{t: 0}, {t: 10}, {t: 11}, {t: 19}, {t: 20}, {t: 27}, {t: 30}, {t: 35}}},
},
{
lset: map[string]string{"a": "overlap-beginning"},
chunks: [][]sample{{{t: 0}, {t: 5}, {t: 10}, {t: 12}, {t: 20}}},
},
{
lset: map[string]string{"before": "fix"},
chunks: [][]sample{{{t: 0}, {t: 10}, {t: 11}, {t: 19}, {t: 20}, {t: 27}, {t: 30}, {t: 35}}},
lset: map[string]string{"a": "overlap-ending"},
chunks: [][]sample{{{t: 0}, {t: 10}, {t: 13}, {t: 20}}, {{t: 21}, {t: 27}, {t: 30}, {t: 35}}},
},
},
},
}
for _, tc := range populateBlocksCases {
if ok := t.Run(tc.title, func(t *testing.T) {
{
title: "Populate from three partially overlapping blocks with few full chunks.",
inputSeriesSamples: [][]seriesSamples{
{
{
lset: map[string]string{"a": "1", "b": "1"},
chunks: samplesForRange(0, 659, 120), // 5 chunks and half.
},
{
lset: map[string]string{"a": "1", "b": "2"},
chunks: samplesForRange(0, 659, 120),
},
},
{
{
lset: map[string]string{"a": "1", "b": "2"},
chunks: samplesForRange(480, 1199, 120), // two chunks overlapping with previous, two non overlapping and two overlapping with next block.
},
{
lset: map[string]string{"a": "1", "b": "3"},
chunks: samplesForRange(480, 1199, 120),
},
},
{
{
lset: map[string]string{"a": "1", "b": "2"},
chunks: samplesForRange(960, 1499, 120), // 5 chunks and half.
},
{
lset: map[string]string{"a": "1", "b": "4"},
chunks: samplesForRange(960, 1499, 120),
},
},
},
expSeriesSamples: []seriesSamples{
{
lset: map[string]string{"a": "1", "b": "1"},
chunks: samplesForRange(0, 659, 120),
},
{
lset: map[string]string{"a": "1", "b": "2"},
chunks: samplesForRange(0, 1499, 120),
},
{
lset: map[string]string{"a": "1", "b": "3"},
chunks: samplesForRange(480, 1199, 120),
},
{
lset: map[string]string{"a": "1", "b": "4"},
chunks: samplesForRange(960, 1499, 120),
},
},
},
{
title: "Populate from three partially overlapping blocks with chunks that are expected to merge into single big chunks.",
inputSeriesSamples: [][]seriesSamples{
{
{
lset: map[string]string{"a": "1", "b": "2"},
chunks: [][]sample{{{t: 0}, {t: 6902464}}, {{t: 6961968}, {t: 7080976}}},
},
},
{
{
lset: map[string]string{"a": "1", "b": "2"},
chunks: [][]sample{{{t: 3600000}, {t: 13953696}}, {{t: 14042952}, {t: 14221464}}},
},
},
{
{
lset: map[string]string{"a": "1", "b": "2"},
chunks: [][]sample{{{t: 10800000}, {t: 14251232}}, {{t: 14280984}, {t: 14340488}}},
},
},
},
expSeriesSamples: []seriesSamples{
{
lset: map[string]string{"a": "1", "b": "2"},
chunks: [][]sample{{{t: 0}, {t: 3600000}, {t: 6902464}, {t: 6961968}, {t: 7080976}, {t: 10800000}, {t: 13953696}, {t: 14042952}, {t: 14221464}, {t: 14251232}}, {{t: 14280984}, {t: 14340488}}},
},
},
},
} {
t.Run(tc.title, func(t *testing.T) {
blocks := make([]BlockReader, 0, len(tc.inputSeriesSamples))
for _, b := range tc.inputSeriesSamples {
ir, cr, mint, maxt := createIdxChkReaders(t, b)
@ -767,12 +959,39 @@ func TestCompaction_populateBlock(t *testing.T) {
}
testutil.Ok(t, err)
testutil.Equals(t, tc.expSeriesSamples, iw.series)
// Check if response is expected and chunk is valid.
var raw []seriesSamples
for _, s := range iw.seriesChunks {
ss := seriesSamples{lset: s.l.Map()}
var iter chunkenc.Iterator
for _, chk := range s.chunks {
var (
samples = make([]sample, 0, chk.Chunk.NumSamples())
iter = chk.Chunk.Iterator(iter)
firstTs int64 = math.MaxInt64
s sample
)
for iter.Next() {
s.t, s.v = iter.At()
if firstTs == math.MaxInt64 {
firstTs = s.t
}
samples = append(samples, s)
}
// Check if stats are calculated properly.
s := BlockStats{
NumSeries: uint64(len(tc.expSeriesSamples)),
// Check if chunk has correct min, max times.
testutil.Equals(t, firstTs, chk.MinTime, "chunk Meta %v does not match the first encoded sample timestamp: %v", chk, firstTs)
testutil.Equals(t, s.t, chk.MaxTime, "chunk Meta %v does not match the last encoded sample timestamp %v", chk, s.t)
testutil.Ok(t, iter.Err())
ss.chunks = append(ss.chunks, samples)
}
raw = append(raw, ss)
}
testutil.Equals(t, tc.expSeriesSamples, raw)
// Check if stats are calculated properly.
s := BlockStats{NumSeries: uint64(len(tc.expSeriesSamples))}
for _, series := range tc.expSeriesSamples {
s.NumChunks += uint64(len(series.chunks))
for _, chk := range series.chunks {
@ -780,9 +999,7 @@ func TestCompaction_populateBlock(t *testing.T) {
}
}
testutil.Equals(t, s, meta.Stats)
}); !ok {
return
}
})
}
}
@ -1090,3 +1307,98 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
})
}
}
func TestBlockBaseSeriesSet(t *testing.T) {
type refdSeries struct {
lset labels.Labels
chunks []chunks.Meta
ref uint64
}
cases := []struct {
series []refdSeries
// Postings should be in the sorted order of the series
postings []uint64
expIdxs []int
}{
{
series: []refdSeries{
{
lset: labels.New([]labels.Label{{Name: "a", Value: "a"}}...),
chunks: []chunks.Meta{
{Ref: 29}, {Ref: 45}, {Ref: 245}, {Ref: 123}, {Ref: 4232}, {Ref: 5344},
{Ref: 121},
},
ref: 12,
},
{
lset: labels.New([]labels.Label{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}}...),
chunks: []chunks.Meta{
{Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26},
},
ref: 10,
},
{
lset: labels.New([]labels.Label{{Name: "b", Value: "c"}}...),
chunks: []chunks.Meta{{Ref: 8282}},
ref: 1,
},
{
lset: labels.New([]labels.Label{{Name: "b", Value: "b"}}...),
chunks: []chunks.Meta{
{Ref: 829}, {Ref: 239}, {Ref: 2349}, {Ref: 659}, {Ref: 269},
},
ref: 108,
},
},
postings: []uint64{12, 13, 10, 108}, // 13 doesn't exist and should just be skipped over.
expIdxs: []int{0, 1, 3},
},
{
series: []refdSeries{
{
lset: labels.New([]labels.Label{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}}...),
chunks: []chunks.Meta{
{Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26},
},
ref: 10,
},
{
lset: labels.New([]labels.Label{{Name: "b", Value: "c"}}...),
chunks: []chunks.Meta{{Ref: 8282}},
ref: 3,
},
},
postings: []uint64{},
expIdxs: []int{},
},
}
for _, tc := range cases {
mi := newMockIndex()
for _, s := range tc.series {
testutil.Ok(t, mi.AddSeries(s.ref, s.lset, s.chunks...))
}
bcs := &blockBaseSeriesSet{
p: index.NewListPostings(tc.postings),
index: mi,
tombstones: tombstones.NewMemTombstones(),
}
i := 0
for bcs.Next() {
chks := bcs.currIterFn().chks
idx := tc.expIdxs[i]
testutil.Equals(t, tc.series[idx].lset, bcs.currLabels)
testutil.Equals(t, tc.series[idx].chunks, chks)
i++
}
testutil.Equals(t, len(tc.expIdxs), i)
testutil.Ok(t, bcs.Err())
}
}

@ -321,7 +321,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
if err != nil {
return err
}
head, err := NewHead(nil, db.logger, w, 1, db.dir, nil, DefaultStripeSize, nil)
head, err := NewHead(nil, db.logger, w, DefaultBlockDuration, db.dir, nil, DefaultStripeSize, nil)
if err != nil {
return err
}
@ -359,9 +359,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
return errors.Wrap(err, "writing WAL")
}
// Querier loads the wal and returns a new querier over the data partition for the given time range.
// Current implementation doesn't support multiple Queriers.
func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQueryable, error) {
select {
case <-db.closed:
return nil, ErrClosed
@ -380,7 +378,7 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu
blocks[i] = b
}
head, err := NewHead(nil, db.logger, nil, 1, db.dir, nil, DefaultStripeSize, nil)
head, err := NewHead(nil, db.logger, nil, DefaultBlockDuration, db.dir, nil, DefaultStripeSize, nil)
if err != nil {
return nil, err
}
@ -398,7 +396,7 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu
if err != nil {
return nil, err
}
head, err = NewHead(nil, db.logger, w, 1, db.dir, nil, DefaultStripeSize, nil)
head, err = NewHead(nil, db.logger, w, DefaultBlockDuration, db.dir, nil, DefaultStripeSize, nil)
if err != nil {
return nil, err
}
@ -413,24 +411,32 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu
}
db.closers = append(db.closers, head)
// TODO: Refactor so that it is possible to obtain a Querier without initializing a writable DB instance.
// Option 1: refactor DB to have the Querier implementation using the DBReadOnly.Querier implementation not the opposite.
// Option 2: refactor Querier to use another independent func which
// can than be used by a read only and writable db instances without any code duplication.
dbWritable := &DB{
return &DB{
dir: db.dir,
logger: db.logger,
blocks: blocks,
head: head,
}, nil
}
return dbWritable.Querier(ctx, mint, maxt)
// Querier loads the blocks and wal and returns a new querier over the data partition for the given time range.
// Current implementation doesn't support multiple Queriers.
func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
q, err := db.loadDataAsQueryable(maxt)
if err != nil {
return nil, err
}
return q.Querier(ctx, mint, maxt)
}
func (db *DBReadOnly) ChunkQuerier(context.Context, int64, int64) (storage.ChunkQuerier, error) {
// TODO(bwplotka): Implement in next PR.
return nil, errors.New("not implemented")
// ChunkQuerier loads blocks and the wal and returns a new chunk querier over the data partition for the given time range.
// Current implementation doesn't support multiple ChunkQueriers.
func (db *DBReadOnly) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
q, err := db.loadDataAsQueryable(maxt)
if err != nil {
return nil, err
}
return q.ChunkQuerier(ctx, mint, maxt)
}
// Blocks returns a slice of block readers for persisted blocks.
@ -1330,10 +1336,8 @@ func (db *DB) Snapshot(dir string, withHead bool) error {
}
// Querier returns a new querier over the data partition for the given time range.
// A goroutine must not handle more than one open Querier.
func (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) {
var blocks []BlockReader
var blockMetas []BlockMeta
db.mtx.RLock()
defer db.mtx.RUnlock()
@ -1341,7 +1345,6 @@ func (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, err
for _, b := range db.blocks {
if b.OverlapsClosedInterval(mint, maxt) {
blocks = append(blocks, b)
blockMetas = append(blockMetas, b.Meta())
}
}
if maxt >= db.head.MinTime() {
@ -1361,27 +1364,50 @@ func (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, err
}
// If we fail, all previously opened queriers must be closed.
for _, q := range blockQueriers {
q.Close()
// TODO(bwplotka): Handle error.
_ = q.Close()
}
return nil, errors.Wrapf(err, "open querier for block %s", b)
}
return storage.NewMergeQuerier(blockQueriers, nil, storage.ChainedSeriesMerge), nil
}
if len(OverlappingBlocks(blockMetas)) > 0 {
return &verticalQuerier{
querier: querier{
blocks: blockQueriers,
},
}, nil
// ChunkQuerier returns a new chunk querier over the data partition for the given time range.
func (db *DB) ChunkQuerier(_ context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
var blocks []BlockReader
db.mtx.RLock()
defer db.mtx.RUnlock()
for _, b := range db.blocks {
if b.OverlapsClosedInterval(mint, maxt) {
blocks = append(blocks, b)
}
}
if maxt >= db.head.MinTime() {
blocks = append(blocks, &RangeHead{
head: db.head,
mint: mint,
maxt: maxt,
})
}
return &querier{
blocks: blockQueriers,
}, nil
blockQueriers := make([]storage.ChunkQuerier, 0, len(blocks))
for _, b := range blocks {
q, err := NewBlockChunkQuerier(b, mint, maxt)
if err == nil {
blockQueriers = append(blockQueriers, q)
continue
}
// If we fail, all previously opened queriers must be closed.
for _, q := range blockQueriers {
// TODO(bwplotka): Handle error.
_ = q.Close()
}
return nil, errors.Wrapf(err, "open querier for block %s", b)
}
func (db *DB) ChunkQuerier(context.Context, int64, int64) (storage.ChunkQuerier, error) {
// TODO(bwplotka): Implement in next PR.
return nil, errors.New("not implemented")
return storage.NewMergeChunkQuerier(blockQueriers, nil, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil
}
func rangeForTimestamp(t int64, width int64) (maxt int64) {

@ -104,6 +104,36 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str
return result
}
// queryChunks runs a matcher query against the querier and fully expands its data.
func queryChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*labels.Matcher) map[string][]chunks.Meta {
ss := q.Select(false, nil, matchers...)
defer func() {
testutil.Ok(t, q.Close())
}()
result := map[string][]chunks.Meta{}
for ss.Next() {
series := ss.At()
chks := []chunks.Meta{}
it := series.Iterator()
for it.Next() {
chks = append(chks, it.At())
}
testutil.Ok(t, it.Err())
if len(chks) == 0 {
continue
}
name := series.Labels().String()
result[name] = chks
}
testutil.Ok(t, ss.Err())
testutil.Equals(t, 0, len(ss.Warnings()))
return result
}
// Ensure that blocks are held in memory in their time order
// and not in ULID order as they are read from the directory.
func TestDB_reloadOrder(t *testing.T) {
@ -387,7 +417,7 @@ Outer:
}
expss := newMockSeriesSet([]storage.Series{
newSeries(map[string]string{"a": "b"}, expSamples),
storage.NewListSeries(labels.FromStrings("a", "b"), expSamples),
})
for {
@ -403,8 +433,8 @@ Outer:
testutil.Equals(t, sexp.Labels(), sres.Labels())
smplExp, errExp := expandSeriesIterator(sexp.Iterator())
smplRes, errRes := expandSeriesIterator(sres.Iterator())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil)
testutil.Equals(t, errExp, errRes)
testutil.Equals(t, smplExp, smplRes)
@ -683,7 +713,7 @@ Outer:
}
expss := newMockSeriesSet([]storage.Series{
newSeries(map[string]string{"a": "b"}, expSamples),
storage.NewListSeries(labels.FromStrings("a", "b"), expSamples),
})
if len(expSamples) == 0 {
@ -704,8 +734,8 @@ Outer:
testutil.Equals(t, sexp.Labels(), sres.Labels())
smplExp, errExp := expandSeriesIterator(sexp.Iterator())
smplRes, errRes := expandSeriesIterator(sres.Iterator())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil)
testutil.Equals(t, errExp, errRes)
testutil.Equals(t, smplExp, smplRes)
@ -854,7 +884,7 @@ func TestDB_e2e(t *testing.T) {
for ss.Next() {
x := ss.At()
smpls, err := expandSeriesIterator(x.Iterator())
smpls, err := storage.ExpandSamples(x.Iterator(), newSample)
testutil.Ok(t, err)
if len(smpls) > 0 {
@ -1033,7 +1063,7 @@ func TestTombstoneClean(t *testing.T) {
}
expss := newMockSeriesSet([]storage.Series{
newSeries(map[string]string{"a": "b"}, expSamples),
storage.NewListSeries(labels.FromStrings("a", "b"), expSamples),
})
if len(expSamples) == 0 {
@ -1053,8 +1083,8 @@ func TestTombstoneClean(t *testing.T) {
testutil.Equals(t, sexp.Labels(), sres.Labels())
smplExp, errExp := expandSeriesIterator(sexp.Iterator())
smplRes, errRes := expandSeriesIterator(sres.Iterator())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil)
testutil.Equals(t, errExp, errRes)
testutil.Equals(t, smplExp, smplRes)
@ -1550,6 +1580,8 @@ func TestQuerierWithBoundaryChunks(t *testing.T) {
for i := int64(0); i < 5; i++ {
_, err := app.Add(label, i*blockRange, 0)
testutil.Ok(t, err)
_, err = app.Add(labels.FromStrings("blockID", strconv.FormatInt(i, 10)), i*blockRange, 0)
testutil.Ok(t, err)
}
err := app.Commit()
@ -1564,9 +1596,11 @@ func TestQuerierWithBoundaryChunks(t *testing.T) {
testutil.Ok(t, err)
defer q.Close()
// The requested interval covers 2 blocks, so the querier should contain 2 blocks.
count := len(q.(*querier).blocks)
testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count)
// The requested interval covers 2 blocks, so the querier's label values for blockID should give us 2 values, one from each block.
b, ws, err := q.LabelValues("blockID")
testutil.Ok(t, err)
testutil.Equals(t, storage.Warnings(nil), ws)
testutil.Equals(t, []string{"1", "2"}, b)
}
// TestInitializeHeadTimestamp ensures that the h.minTime is set properly.
@ -1930,371 +1964,6 @@ func TestCorrectNumTombstones(t *testing.T) {
testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones)
}
func TestVerticalCompaction(t *testing.T) {
cases := []struct {
blockSeries [][]storage.Series
expSeries map[string][]tsdbutil.Sample
expBlockNum int
expOverlappingBlocks int
}{
// Case 0
// |--------------|
// |----------------|
{
blockSeries: [][]storage.Series{
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0},
}),
},
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99},
sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
sample{12, 99}, sample{13, 99}, sample{14, 99},
}),
},
},
expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99},
sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
sample{12, 99}, sample{13, 99}, sample{14, 99},
}},
expBlockNum: 1,
expOverlappingBlocks: 1,
},
// Case 1
// |-------------------------------|
// |----------------|
{
blockSeries: [][]storage.Series{
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0},
sample{11, 0}, sample{13, 0}, sample{17, 0},
}),
},
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99},
sample{8, 99}, sample{9, 99}, sample{10, 99},
}),
},
},
expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99},
sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 0},
sample{13, 0}, sample{17, 0},
}},
expBlockNum: 1,
expOverlappingBlocks: 1,
},
// Case 2
// |-------------------------------|
// |------------|
// |--------------------|
{
blockSeries: [][]storage.Series{
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0},
sample{11, 0}, sample{13, 0}, sample{17, 0},
}),
},
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99},
sample{8, 99}, sample{9, 99},
}),
},
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59},
sample{21, 59}, sample{22, 59},
}),
},
},
expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99},
sample{8, 99}, sample{9, 99}, sample{11, 0}, sample{13, 0},
sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59},
sample{21, 59}, sample{22, 59},
}},
expBlockNum: 1,
expOverlappingBlocks: 1,
},
// Case 3
// |-------------------|
// |--------------------|
// |----------------|
{
blockSeries: [][]storage.Series{
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{8, 0}, sample{9, 0},
}),
},
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59},
sample{21, 59}, sample{22, 59},
}),
},
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{5, 99}, sample{6, 99}, sample{7, 99}, sample{8, 99},
sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
sample{16, 99}, sample{17, 99},
}),
},
},
expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 99}, sample{6, 99}, sample{7, 99}, sample{8, 99},
sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{14, 59},
sample{15, 59}, sample{16, 99}, sample{17, 59}, sample{20, 59},
sample{21, 59}, sample{22, 59},
}},
expBlockNum: 1,
expOverlappingBlocks: 1,
},
// Case 4
// |-------------------------------------|
// |------------|
// |-------------------------|
{
blockSeries: [][]storage.Series{
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
sample{20, 0}, sample{22, 0},
}),
},
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
sample{11, 59},
}),
},
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99},
sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
sample{16, 99}, sample{17, 99},
}),
},
},
expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59},
sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59},
sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
sample{20, 0}, sample{22, 0},
}},
expBlockNum: 1,
expOverlappingBlocks: 1,
},
// Case 5: series are merged properly when there are multiple series.
// |-------------------------------------|
// |------------|
// |-------------------------|
{
blockSeries: [][]storage.Series{
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
sample{20, 0}, sample{22, 0},
}),
newSeries(map[string]string{"b": "c"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
sample{20, 0}, sample{22, 0},
}),
newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
sample{20, 0}, sample{22, 0},
}),
},
{
newSeries(map[string]string{"__name__": "a"}, []tsdbutil.Sample{
sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
sample{11, 59},
}),
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
sample{11, 59},
}),
newSeries(map[string]string{"aa": "bb"}, []tsdbutil.Sample{
sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
sample{11, 59},
}),
newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{
sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
sample{11, 59},
}),
},
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99},
sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
sample{16, 99}, sample{17, 99},
}),
newSeries(map[string]string{"aa": "bb"}, []tsdbutil.Sample{
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99},
sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
sample{16, 99}, sample{17, 99},
}),
newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99},
sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
sample{16, 99}, sample{17, 99},
}),
},
},
expSeries: map[string][]tsdbutil.Sample{
`{__name__="a"}`: {
sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
sample{11, 59},
},
`{a="b"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59},
sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59},
sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
sample{20, 0}, sample{22, 0},
},
`{aa="bb"}`: {
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 59},
sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59},
sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
},
`{b="c"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
sample{20, 0}, sample{22, 0},
},
`{c="d"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59},
sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59},
sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
sample{20, 0}, sample{22, 0},
},
},
expBlockNum: 1,
expOverlappingBlocks: 1,
},
// Case 6
// |--------------|
// |----------------|
// |--------------|
// |----------------|
{
blockSeries: [][]storage.Series{
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0},
}),
},
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99},
sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
sample{12, 99}, sample{13, 99}, sample{14, 99},
}),
},
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{20, 0}, sample{21, 0}, sample{22, 0}, sample{24, 0},
sample{25, 0}, sample{27, 0}, sample{28, 0}, sample{29, 0},
}),
},
{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{23, 99}, sample{25, 99}, sample{26, 99}, sample{27, 99},
sample{28, 99}, sample{29, 99}, sample{30, 99}, sample{31, 99},
}),
},
},
expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99},
sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
sample{12, 99}, sample{13, 99}, sample{14, 99},
sample{20, 0}, sample{21, 0}, sample{22, 0}, sample{23, 99},
sample{24, 0}, sample{25, 99}, sample{26, 99}, sample{27, 99},
sample{28, 99}, sample{29, 99}, sample{30, 99}, sample{31, 99},
}},
expBlockNum: 2,
expOverlappingBlocks: 2,
},
}
defaultMatcher := labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")
for _, c := range cases {
if ok := t.Run("", func(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "data")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
for _, series := range c.blockSeries {
createBlock(t, tmpdir, series)
}
opts := DefaultOptions()
opts.AllowOverlappingBlocks = true
db, err := Open(tmpdir, nil, nil, opts)
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, db.Close())
}()
db.DisableCompactions()
testutil.Assert(t, len(db.blocks) == len(c.blockSeries), "Wrong number of blocks [before compact].")
// Vertical Query Merging test.
querier, err := db.Querier(context.TODO(), 0, 100)
testutil.Ok(t, err)
actSeries := query(t, querier, defaultMatcher)
testutil.Equals(t, c.expSeries, actSeries)
// Vertical compaction.
lc := db.compactor.(*LeveledCompactor)
testutil.Equals(t, 0, int(prom_testutil.ToFloat64(lc.metrics.overlappingBlocks)), "overlapping blocks count should be still 0 here")
err = db.Compact()
testutil.Ok(t, err)
testutil.Equals(t, c.expBlockNum, len(db.Blocks()), "Wrong number of blocks [after compact]")
testutil.Equals(t, c.expOverlappingBlocks, int(prom_testutil.ToFloat64(lc.metrics.overlappingBlocks)), "overlapping blocks count mismatch")
// Query test after merging the overlapping blocks.
querier, err = db.Querier(context.TODO(), 0, 100)
testutil.Ok(t, err)
actSeries = query(t, querier, defaultMatcher)
testutil.Equals(t, c.expSeries, actSeries)
}); !ok {
return
}
}
}
// TestBlockRanges checks the following use cases:
// - No samples can be added with timestamps lower than the last block maxt.
// - The compactor doesn't create overlapping blocks
@ -2400,7 +2069,7 @@ func TestDBReadOnly(t *testing.T) {
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
expBlocks []*Block
expSeries map[string][]tsdbutil.Sample
expSeriesCount int
expChunks map[string][]chunks.Meta
expDBHash []byte
matchAll = labels.MustNewMatcher(labels.MatchEqual, "", "")
err error
@ -2416,15 +2085,21 @@ func TestDBReadOnly(t *testing.T) {
}()
dbBlocks := []*BlockMeta{
{MinTime: 10, MaxTime: 11},
{MinTime: 11, MaxTime: 12},
{MinTime: 12, MaxTime: 13},
// Create three 2-sample blocks.
{MinTime: 10, MaxTime: 12},
{MinTime: 12, MaxTime: 14},
{MinTime: 14, MaxTime: 16},
}
for _, m := range dbBlocks {
createBlock(t, dbDir, genSeries(1, 1, m.MinTime, m.MaxTime))
_ = createBlock(t, dbDir, genSeries(1, 1, m.MinTime, m.MaxTime))
}
expSeriesCount++
// Add head to test DBReadOnly WAL reading capabilities.
w, err := wal.New(logger, nil, filepath.Join(dbDir, "wal"), true)
testutil.Ok(t, err)
h := createHead(t, w, genSeries(1, 1, 16, 18), dbDir)
testutil.Ok(t, h.Close())
}
// Open a normal db to use for a comparison.
@ -2439,7 +2114,6 @@ func TestDBReadOnly(t *testing.T) {
_, err = app.Add(labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
expSeriesCount++
expBlocks = dbWritable.Blocks()
expDbSize, err := fileutil.DirSize(dbWritable.Dir())
@ -2449,35 +2123,49 @@ func TestDBReadOnly(t *testing.T) {
q, err := dbWritable.Querier(context.TODO(), math.MinInt64, math.MaxInt64)
testutil.Ok(t, err)
expSeries = query(t, q, matchAll)
cq, err := dbWritable.ChunkQuerier(context.TODO(), math.MinInt64, math.MaxInt64)
testutil.Ok(t, err)
expChunks = queryChunks(t, cq, matchAll)
testutil.Ok(t, dbWritable.Close()) // Close here to allow getting the dir hash for windows.
expDBHash = testutil.DirHash(t, dbWritable.Dir())
}
// Open a read only db and ensure that the API returns the same result as the normal DB.
{
dbReadOnly, err := OpenDBReadOnly(dbDir, logger)
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, dbReadOnly.Close())
}()
defer func() { testutil.Ok(t, dbReadOnly.Close()) }()
t.Run("blocks", func(t *testing.T) {
blocks, err := dbReadOnly.Blocks()
testutil.Ok(t, err)
testutil.Equals(t, len(expBlocks), len(blocks))
for i, expBlock := range expBlocks {
testutil.Equals(t, expBlock.Meta(), blocks[i].Meta(), "block meta mismatch")
}
})
t.Run("querier", func(t *testing.T) {
// Open a read only db and ensure that the API returns the same result as the normal DB.
q, err := dbReadOnly.Querier(context.TODO(), math.MinInt64, math.MaxInt64)
testutil.Ok(t, err)
readOnlySeries := query(t, q, matchAll)
readOnlyDBHash := testutil.DirHash(t, dbDir)
testutil.Equals(t, expSeriesCount, len(readOnlySeries), "total series mismatch")
testutil.Equals(t, len(expSeries), len(readOnlySeries), "total series mismatch")
testutil.Equals(t, expSeries, readOnlySeries, "series mismatch")
testutil.Equals(t, expDBHash, readOnlyDBHash, "after all read operations the db hash should remain the same")
}
})
t.Run("chunk querier", func(t *testing.T) {
cq, err := dbReadOnly.ChunkQuerier(context.TODO(), math.MinInt64, math.MaxInt64)
testutil.Ok(t, err)
readOnlySeries := queryChunks(t, cq, matchAll)
readOnlyDBHash := testutil.DirHash(t, dbDir)
testutil.Equals(t, len(expChunks), len(readOnlySeries), "total series mismatch")
testutil.Equals(t, expChunks, readOnlySeries, "series chunks mismatch")
testutil.Equals(t, expDBHash, readOnlyDBHash, "after all read operations the db hash should remain the same")
})
}
// TestDBReadOnlyClosing ensures that after closing the db

@ -37,6 +37,7 @@ import (
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wal"
"go.uber.org/atomic"
)
@ -1928,13 +1929,9 @@ type sample struct {
v float64
}
func (s sample) T() int64 {
return s.t
}
func (s sample) V() float64 {
return s.v
}
func newSample(t int64, v float64) tsdbutil.Sample { return sample{t, v} }
func (s sample) T() int64 { return s.t }
func (s sample) V() float64 { return s.v }
// memSeries is the in-memory representation of a series. None of its methods
// are goroutine safe and it is the caller's responsibility to lock it.

@ -577,7 +577,7 @@ func TestHeadDeleteSimple(t *testing.T) {
actSeriesSet := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))
testutil.Ok(t, q.Close())
expSeriesSet := newMockSeriesSet([]storage.Series{
newSeries(map[string]string{lblDefault.Name: lblDefault.Value}, func() []tsdbutil.Sample {
storage.NewListSeries(labels.Labels{lblDefault}, func() []tsdbutil.Sample {
ss := make([]tsdbutil.Sample, 0, len(c.smplsExp))
for _, s := range c.smplsExp {
ss = append(ss, s)
@ -602,8 +602,8 @@ func TestHeadDeleteSimple(t *testing.T) {
testutil.Equals(t, expSeries.Labels(), actSeries.Labels())
smplExp, errExp := expandSeriesIterator(expSeries.Iterator())
smplRes, errRes := expandSeriesIterator(actSeries.Iterator())
smplExp, errExp := storage.ExpandSamples(expSeries.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(actSeries.Iterator(), nil)
testutil.Equals(t, errExp, errRes)
testutil.Equals(t, smplExp, smplRes)
@ -655,7 +655,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
testutil.Assert(t, res.Next(), "series don't exist")
exps := res.At()
it = exps.Iterator()
resSamples, err := expandSeriesIterator(it)
resSamples, err := storage.ExpandSamples(it, newSample)
testutil.Ok(t, err)
testutil.Equals(t, []tsdbutil.Sample{sample{11, 1}}, resSamples)
for res.Next() {
@ -828,12 +828,9 @@ func TestDelete_e2e(t *testing.T) {
smpls = deletedSamples(smpls, del.drange)
// Only append those series for which samples exist as mockSeriesSet
// doesn't skip series with no samples.
// TODO: But sometimes SeriesSet returns an empty SeriesIterator
// TODO: But sometimes SeriesSet returns an empty chunkenc.Iterator
if len(smpls) > 0 {
matchedSeries = append(matchedSeries, newSeries(
m.Map(),
smpls,
))
matchedSeries = append(matchedSeries, storage.NewListSeries(m, smpls))
}
}
expSs := newMockSeriesSet(matchedSeries)
@ -856,8 +853,8 @@ func TestDelete_e2e(t *testing.T) {
sexp := expSs.At()
sres := ss.At()
testutil.Equals(t, sexp.Labels(), sres.Labels())
smplExp, errExp := expandSeriesIterator(sexp.Iterator())
smplRes, errRes := expandSeriesIterator(sres.Iterator())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil)
testutil.Equals(t, errExp, errRes)
testutil.Equals(t, smplExp, smplRes)
}
@ -1417,7 +1414,6 @@ func TestAddDuplicateLabelName(t *testing.T) {
func TestMemSeriesIsolation(t *testing.T) {
// Put a series, select it. GC it and then access it.
lastValue := func(h *Head, maxAppendID uint64) int {
idx, err := h.Index()
@ -1428,12 +1424,16 @@ func TestMemSeriesIsolation(t *testing.T) {
chunks, err := h.chunksRange(math.MinInt64, math.MaxInt64, iso)
testutil.Ok(t, err)
querier := &blockQuerier{
mint: 0,
maxt: 10000,
// Hm.. here direct block chunk querier might be required?
querier := blockQuerier{
blockBaseQuerier: &blockBaseQuerier{
index: idx,
chunks: chunks,
tombstones: tombstones.NewMemTombstones(),
mint: 0,
maxt: 10000,
},
}
testutil.Ok(t, err)

@ -14,6 +14,7 @@
package tsdb
import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
@ -21,47 +22,34 @@ import (
)
type mockIndexWriter struct {
series []seriesSamples
seriesChunks []series
}
func (mockIndexWriter) AddSymbol(sym string) error { return nil }
func (m *mockIndexWriter) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error {
i := -1
for j, s := range m.series {
if !labels.Equal(labels.FromMap(s.lset), l) {
continue
func copyChunk(c chunkenc.Chunk) (chunkenc.Chunk, error) {
b := c.Bytes()
nb := make([]byte, len(b))
copy(nb, b)
return chunkenc.FromData(c.Encoding(), nb)
}
i = j
break
}
if i == -1 {
m.series = append(m.series, seriesSamples{
lset: l.Map(),
})
i = len(m.series) - 1
}
var iter chunkenc.Iterator
for _, chk := range chunks {
samples := make([]sample, 0, chk.Chunk.NumSamples())
iter = chk.Chunk.Iterator(iter)
for iter.Next() {
s := sample{}
s.t, s.v = iter.At()
samples = append(samples, s)
func (mockIndexWriter) AddSymbol(string) error { return nil }
func (m *mockIndexWriter) AddSeries(_ uint64, l labels.Labels, chks ...chunks.Meta) error {
// Copy chunks as their bytes are pooled.
chksNew := make([]chunks.Meta, len(chks))
for i, chk := range chks {
c, err := copyChunk(chk.Chunk)
if err != nil {
return errors.Wrap(err, "mockIndexWriter: copy chunk")
}
if err := iter.Err(); err != nil {
return err
chksNew[i] = chunks.Meta{MaxTime: chk.MaxTime, MinTime: chk.MinTime, Chunk: c}
}
m.series[i].chunks = append(m.series[i].chunks, samples)
}
// We don't combine multiple same series together, by design as `AddSeries` requires full series to be saved.
m.seriesChunks = append(m.seriesChunks, series{l: l, chunks: chksNew})
return nil
}
func (mockIndexWriter) WriteLabelIndex(names []string, values []string) error { return nil }
func (mockIndexWriter) WriteLabelIndex([]string, []string) error { return nil }
func (mockIndexWriter) Close() error { return nil }
type mockBReader struct {

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

@ -18,23 +18,37 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
)
type Samples interface {
Get(i int) Sample
Len() int
}
type Sample interface {
T() int64
V() float64
}
type SampleSlice []Sample
func (s SampleSlice) Get(i int) Sample { return s[i] }
func (s SampleSlice) Len() int { return len(s) }
func ChunkFromSamples(s []Sample) chunks.Meta {
return ChunkFromSamplesGeneric(SampleSlice(s))
}
func ChunkFromSamplesGeneric(s Samples) chunks.Meta {
mint, maxt := int64(0), int64(0)
if len(s) > 0 {
mint, maxt = s[0].T(), s[len(s)-1].T()
if s.Len() > 0 {
mint, maxt = s.Get(0).T(), s.Get(s.Len()-1).T()
}
c := chunkenc.NewXORChunk()
ca, _ := c.Appender()
for _, s := range s {
ca.Append(s.T(), s.V())
for i := 0; i < s.Len(); i++ {
ca.Append(s.Get(i).T(), s.Get(i).V())
}
return chunks.Meta{
MinTime: mint,

@ -172,8 +172,7 @@ type TSDBAdminStats interface {
// API can register a set of endpoints in a router and handle
// them using the provided storage and query engine.
type API struct {
// TODO(bwplotka): Change to SampleAndChunkQueryable in next PR.
Queryable storage.Queryable
Queryable storage.SampleAndChunkQueryable
QueryEngine *promql.Engine
targetRetriever func(context.Context) TargetRetriever
@ -502,6 +501,9 @@ func (api *API) labelNames(r *http.Request) apiFuncResult {
if err != nil {
return apiFuncResult{nil, &apiError{errorExec, err}, warnings, nil}
}
if names == nil {
names = []string{}
}
return apiFuncResult{names, nil, warnings, nil}
}
@ -542,6 +544,9 @@ func (api *API) labelValues(r *http.Request) (result apiFuncResult) {
if err != nil {
return apiFuncResult{nil, &apiError{errorExec, err}, warnings, closer}
}
if vals == nil {
vals = []string{}
}
return apiFuncResult{vals, nil, warnings, closer}
}
@ -1218,15 +1223,21 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
case prompb.ReadRequest_STREAMED_XOR_CHUNKS:
api.remoteReadStreamedXORChunks(ctx, w, req, externalLabels, sortedExternalLabels)
default:
// On empty or unknown types in req.AcceptedResponseTypes we default to non streamed, raw samples response.
api.remoteReadSamples(ctx, w, req, externalLabels, sortedExternalLabels)
}
}
func (api *API) remoteReadSamples(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest, externalLabels map[string]string, sortedExternalLabels []prompb.Label) {
func (api *API) remoteReadSamples(
ctx context.Context,
w http.ResponseWriter,
req *prompb.ReadRequest,
externalLabels map[string]string,
sortedExternalLabels []prompb.Label,
) {
w.Header().Set("Content-Type", "application/x-protobuf")
w.Header().Set("Content-Encoding", "snappy")
// On empty or unknown types in req.AcceptedResponseTypes we default to non streamed, raw samples response.
resp := prompb.ReadResponse{
Results: make([]*prompb.QueryResult, len(req.Queries)),
}
@ -1265,15 +1276,12 @@ func (api *API) remoteReadSamples(ctx context.Context, w http.ResponseWriter, re
if err != nil {
return err
}
for _, w := range ws {
level.Warn(api.logger).Log("msg", "Warnings on remote read query", "err", w.Error())
}
for _, ts := range resp.Results[i].Timeseries {
ts.Labels = remote.MergeLabels(ts.Labels, sortedExternalLabels)
}
return nil
}(); err != nil {
if httpErr, ok := err.(remote.HTTPError); ok {
@ -1307,14 +1315,13 @@ func (api *API) remoteReadStreamedXORChunks(ctx context.Context, w http.Response
return err
}
// TODO(bwplotka): Use ChunkQuerier once ready in tsdb package.
querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs)
querier, err := api.Queryable.ChunkQuerier(ctx, query.StartTimestampMs, query.EndTimestampMs)
if err != nil {
return err
}
defer func() {
if err := querier.Close(); err != nil {
level.Warn(api.logger).Log("msg", "Error on chunk querier close", "warnings", err.Error())
level.Warn(api.logger).Log("msg", "Error on chunk querier close", "err", err.Error())
}
}()
@ -1331,7 +1338,7 @@ func (api *API) remoteReadStreamedXORChunks(ctx context.Context, w http.Response
}
}
ws, err := remote.DeprecatedStreamChunkedReadResponses(
ws, err := remote.StreamChunkedReadResponses(
remote.NewChunkedWriter(w, f),
int64(i),
// The streaming API has to provide the series sorted.

@ -26,6 +26,7 @@ import (
"net/url"
"os"
"reflect"
"runtime"
"sort"
"strings"
"testing"
@ -1702,13 +1703,14 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, testLabelAPI
}
for i, test := range tests {
t.Run(fmt.Sprintf("run %d %s %q", i, describeAPIFunc(test.endpoint), test.query.Encode()), func(t *testing.T) {
for _, method := range methods(test.endpoint) {
t.Run(method, func(t *testing.T) {
// Build a context with the correct request params.
ctx := context.Background()
for p, v := range test.params {
ctx = route.WithParam(ctx, p, v)
}
t.Logf("run %d\t%s\t%q", i, method, test.query.Encode())
req, err := request(method, test.query)
if err != nil {
@ -1732,8 +1734,15 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, testLabelAPI
} else {
assertAPIResponse(t, res.data, test.response)
}
})
}
})
}
}
func describeAPIFunc(f apiFunc) string {
name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name()
return strings.Split(name[strings.LastIndex(name, ".")+1:], "-")[0]
}
func assertAPIError(t *testing.T, got *apiError, exp errorType) {

Loading…
Cancel
Save