storage: Adjusted fully storage layer support for chunk iterators: Remote read client, readyStorage, fanout. (#7059)

* Fixed nits introduced by https://github.com/prometheus/prometheus/pull/7334
* Added ChunkQueryable implementation to fanout and readyStorage.
* Added more comments.
* Changed NewVerticalChunkSeriesMerger to CompactingChunkSeriesMerger, removed tiny interface by reusing VerticalSeriesMergeFunc for overlapping algorithm for
both chunks and series, for both querying and compacting (!) + made sure duplicates are merged.
* Added ErrChunkSeriesSet
* Added Samples interface for seamless []promb.Sample to []tsdbutil.Sample conversion.
* Deprecating non chunks serieset based StreamChunkedReadResponses, added chunk one.
* Improved tests.
* Split remote client into Write (old storage) and read.
* Queryable client is now SampleAndChunkQueryable. Since we cannot use nice QueryableFunc I moved
all config based options to sampleAndChunkQueryableClient to aboid boilerplate.

In next commit: Changes for TSDB.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
pull/7461/head
Bartlomiej Plotka 2020-06-24 15:41:52 +02:00 committed by GitHub
parent b41adab735
commit b788986717
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1255 additions and 736 deletions

View File

@ -961,6 +961,14 @@ func (s *readyStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Q
return nil, tsdb.ErrNotReady return nil, tsdb.ErrNotReady
} }
// ChunkQuerier implements the Storage interface.
func (s *readyStorage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
if x := s.get(); x != nil {
return x.ChunkQuerier(ctx, mint, maxt)
}
return nil, tsdb.ErrNotReady
}
// Appender implements the Storage interface. // Appender implements the Storage interface.
func (s *readyStorage) Appender() storage.Appender { func (s *readyStorage) Appender() storage.Appender {
if x := s.get(); x != nil { if x := s.get(); x != nil {

View File

@ -17,7 +17,6 @@ import (
"math/rand" "math/rand"
"testing" "testing"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
) )
@ -105,7 +104,7 @@ func TestBufferedSeriesIterator(t *testing.T) {
testutil.Equals(t, ev, v, "value mismatch") testutil.Equals(t, ev, v, "value mismatch")
} }
it = NewBufferIterator(NewListSeriesIterator([]tsdbutil.Sample{ it = NewBufferIterator(NewListSeriesIterator(samples{
sample{t: 1, v: 2}, sample{t: 1, v: 2},
sample{t: 2, v: 3}, sample{t: 2, v: 3},
sample{t: 3, v: 4}, sample{t: 3, v: 4},

View File

@ -14,6 +14,7 @@
package storage package storage
import ( import (
"bytes"
"container/heap" "container/heap"
"context" "context"
"sort" "sort"
@ -24,7 +25,6 @@ import (
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/chunks"
@ -94,12 +94,34 @@ func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error)
} }
return nil, errs.Err() return nil, errs.Err()
} }
secondaries = append(secondaries, querier) secondaries = append(secondaries, querier)
} }
return NewMergeQuerier(primary, secondaries, ChainedSeriesMerge), nil return NewMergeQuerier(primary, secondaries, ChainedSeriesMerge), nil
} }
func (f *fanout) ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQuerier, error) {
primary, err := f.primary.ChunkQuerier(ctx, mint, maxt)
if err != nil {
return nil, err
}
secondaries := make([]ChunkQuerier, 0, len(f.secondaries))
for _, storage := range f.secondaries {
querier, err := storage.ChunkQuerier(ctx, mint, maxt)
if err != nil {
// Close already open Queriers, append potential errors to returned error.
errs := tsdb_errors.MultiError{err}
errs.Add(primary.Close())
for _, q := range secondaries {
errs.Add(q.Close())
}
return nil, errs.Err()
}
secondaries = append(secondaries, querier)
}
return NewMergeChunkQuerier(primary, secondaries, NewCompactingChunkSeriesMerger(ChainedSeriesMerge)), nil
}
func (f *fanout) Appender() Appender { func (f *fanout) Appender() Appender {
primary := f.primary.Appender() primary := f.primary.Appender()
secondaries := make([]Appender, 0, len(f.secondaries)) secondaries := make([]Appender, 0, len(f.secondaries))
@ -220,7 +242,7 @@ func NewMergeQuerier(primary Querier, secondaries []Querier, mergeFn VerticalSer
// //
// In case of overlaps between the data given by primary + secondaries Selects, merge function will be used. // In case of overlaps between the data given by primary + secondaries Selects, merge function will be used.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670 // TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
func NewMergeChunkQuerier(primary ChunkQuerier, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergerFunc) ChunkQuerier { func NewMergeChunkQuerier(primary ChunkQuerier, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier {
queriers := make([]genericQuerier, 0, len(secondaries)+1) queriers := make([]genericQuerier, 0, len(secondaries)+1)
if primary != nil { if primary != nil {
queriers = append(queriers, newGenericQuerierFromChunk(primary)) queriers = append(queriers, newGenericQuerierFromChunk(primary))
@ -232,7 +254,7 @@ func NewMergeChunkQuerier(primary ChunkQuerier, secondaries []ChunkQuerier, merg
} }
return &chunkQuerierAdapter{&mergeGenericQuerier{ return &chunkQuerierAdapter{&mergeGenericQuerier{
mergeFn: (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergerFunc: mergeFn}).Merge, mergeFn: (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFn}).Merge,
queriers: queriers, queriers: queriers,
}} }}
} }
@ -270,7 +292,7 @@ func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matche
return &lazySeriesSet{create: create(seriesSets, q.mergeFn)} return &lazySeriesSet{create: create(seriesSets, q.mergeFn)}
} }
func create(seriesSets []genericSeriesSet, mergeFn genericSeriesMergeFunc) func() (genericSeriesSet, bool) { func create(seriesSets []genericSeriesSet, mergeFunc genericSeriesMergeFunc) func() (genericSeriesSet, bool) {
// Returned function gets called with the first call to Next(). // Returned function gets called with the first call to Next().
return func() (genericSeriesSet, bool) { return func() (genericSeriesSet, bool) {
if len(seriesSets) == 1 { if len(seriesSets) == 1 {
@ -292,7 +314,7 @@ func create(seriesSets []genericSeriesSet, mergeFn genericSeriesMergeFunc) func(
} }
} }
set := &genericMergeSeriesSet{ set := &genericMergeSeriesSet{
mergeFn: mergeFn, mergeFunc: mergeFunc,
sets: seriesSets, sets: seriesSets,
heap: h, heap: h,
} }
@ -361,8 +383,10 @@ func mergeTwoStringSlices(a, b []string) []string {
// LabelNames returns all the unique label names present in the block in sorted order. // LabelNames returns all the unique label names present in the block in sorted order.
func (q *mergeGenericQuerier) LabelNames() ([]string, Warnings, error) { func (q *mergeGenericQuerier) LabelNames() ([]string, Warnings, error) {
labelNamesMap := make(map[string]struct{}) var (
var warnings Warnings labelNamesMap = make(map[string]struct{})
warnings Warnings
)
for _, querier := range q.queriers { for _, querier := range q.queriers {
names, wrn, err := querier.LabelNames() names, wrn, err := querier.LabelNames()
if wrn != nil { if wrn != nil {
@ -403,34 +427,36 @@ func (q *mergeGenericQuerier) Close() error {
// It has to handle time-overlapped series as well. // It has to handle time-overlapped series as well.
type VerticalSeriesMergeFunc func(...Series) Series type VerticalSeriesMergeFunc func(...Series) Series
// VerticalChunkSeriesMergerFunc returns merged chunk series implementation that merges series with same labels together. // NewMergeSeriesSet returns a new SeriesSet that merges many SeriesSets together.
// It has to handle time-overlapped chunk series as well. func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) SeriesSet {
type VerticalChunkSeriesMergerFunc func(...ChunkSeries) ChunkSeries
// NewMergeSeriesSet returns a new SeriesSet that merges results of chkQuerierSeries SeriesSets.
func NewMergeSeriesSet(sets []SeriesSet, merger VerticalSeriesMergeFunc) SeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets)) genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets { for _, s := range sets {
genericSets = append(genericSets, &genericSeriesSetAdapter{s}) genericSets = append(genericSets, &genericSeriesSetAdapter{s})
} }
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: merger}).Merge)} return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)}
} }
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges results of chkQuerierSeries ChunkSeriesSets. // VerticalChunkSeriesMergeFunc returns merged chunk series implementation that merges potentially time-overlapping
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, merger VerticalChunkSeriesMergerFunc) ChunkSeriesSet { // chunk series with the same labels into single ChunkSeries.
//
// NOTE: It's up to implementation how series are vertically merged (if chunks are sorted, re-encoded etc).
type VerticalChunkSeriesMergeFunc func(...ChunkSeries) ChunkSeries
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges many SeriesSet together.
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets)) genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets { for _, s := range sets {
genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s}) genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s})
} }
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergerFunc: merger}).Merge)} return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)}
} }
// genericMergeSeriesSet implements genericSeriesSet. // genericMergeSeriesSet implements genericSeriesSet.
type genericMergeSeriesSet struct { type genericMergeSeriesSet struct {
currentLabels labels.Labels currentLabels labels.Labels
mergeFn genericSeriesMergeFunc mergeFunc genericSeriesMergeFunc
heap genericSeriesSetHeap heap genericSeriesSetHeap
sets []genericSeriesSet sets []genericSeriesSet
@ -441,8 +467,8 @@ type genericMergeSeriesSet struct {
// series returned by the series sets when iterating. // series returned by the series sets when iterating.
// Each series set must return its series in labels order, otherwise // Each series set must return its series in labels order, otherwise
// merged series set will be incorrect. // merged series set will be incorrect.
// Overlapping cases are merged using provided mergeFn. // Overlapped situations are merged using provided mergeFunc.
func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFn genericSeriesMergeFunc) genericSeriesSet { func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
if len(sets) == 1 { if len(sets) == 1 {
return sets[0] return sets[0]
} }
@ -459,7 +485,7 @@ func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFn genericSeriesMerg
} }
} }
return &genericMergeSeriesSet{ return &genericMergeSeriesSet{
mergeFn: mergeFn, mergeFunc: mergeFunc,
sets: sets, sets: sets,
heap: h, heap: h,
} }
@ -507,7 +533,7 @@ func (c *genericMergeSeriesSet) At() Labels {
for _, seriesSet := range c.currentSets { for _, seriesSet := range c.currentSets {
series = append(series, seriesSet.At()) series = append(series, seriesSet.At())
} }
return c.mergeFn(series...) return c.mergeFunc(series...)
} }
func (c *genericMergeSeriesSet) Err() error { func (c *genericMergeSeriesSet) Err() error {
@ -549,10 +575,16 @@ func (h *genericSeriesSetHeap) Pop() interface{} {
return x return x
} }
// ChainedSeriesMerge returns single series from many same series by chaining samples together. // ChainedSeriesMerge returns single series from many same, potentially overlapping series by chaining samples together.
// In case of the timestamp overlap, the first overlapped sample is kept and the rest samples with the same timestamps // If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
// are dropped. We expect the same labels for each given series. // timestamp are dropped.
// TODO(bwplotka): This has the same logic as tsdb.verticalChainedSeries. Remove this in favor of ChainedSeriesMerge in next PRs. //
// This works the best with replicated series, where data from two series are exactly the same. This does not work well
// with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective
// this never happens.
//
// NOTE: Use this only when you see potentially overlapping series, as this introduces small overhead to handle overlaps
// between series.
func ChainedSeriesMerge(s ...Series) Series { func ChainedSeriesMerge(s ...Series) Series {
if len(s) == 0 { if len(s) == 0 {
return nil return nil
@ -580,8 +612,9 @@ func (m *chainSeries) Iterator() chunkenc.Iterator {
return newChainSampleIterator(iterators) return newChainSampleIterator(iterators)
} }
// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series. // chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps
// If one or more samples overlap, the first one is kept and all others with the same timestamp are dropped. // order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
// timestamp are dropped.
type chainSampleIterator struct { type chainSampleIterator struct {
iterators []chunkenc.Iterator iterators []chunkenc.Iterator
h samplesIteratorHeap h samplesIteratorHeap
@ -645,12 +678,13 @@ func (c *chainSampleIterator) Next() bool {
} }
func (c *chainSampleIterator) Err() error { func (c *chainSampleIterator) Err() error {
var errs tsdb_errors.MultiError
for _, iter := range c.iterators { for _, iter := range c.iterators {
if err := iter.Err(); err != nil { if err := iter.Err(); err != nil {
return err errs.Add(err)
} }
} }
return nil return errs.Err()
} }
type samplesIteratorHeap []chunkenc.Iterator type samplesIteratorHeap []chunkenc.Iterator
@ -676,77 +710,76 @@ func (h *samplesIteratorHeap) Pop() interface{} {
return x return x
} }
// VerticalChunkMergeFunc represents a function that merges multiple time overlapping chunks. type compactChunkSeriesMerger struct {
// Passed chunks: mergeFunc VerticalSeriesMergeFunc
// * have to be sorted by MinTime.
// * have to be part of exactly the same timeseries.
// * have to be populated.
type VerticalChunksMergeFunc func(chks ...chunks.Meta) chunks.Iterator
type verticalChunkSeriesMerger struct {
verticalChunksMerger VerticalChunksMergeFunc
labels labels.Labels labels labels.Labels
series []ChunkSeries series []ChunkSeries
} }
// NewVerticalChunkSeriesMerger returns VerticalChunkSeriesMerger that merges the same chunk series into one or more chunks. // NewCompactingChunkSeriesMerger returns VerticalChunkSeriesMergeFunc that merges the same chunk series into single chunk series.
// In case of the chunk overlap, given VerticalChunkMergeFunc will be used. // In case of the chunk overlaps, it compacts those into one or more time-ordered non-overlapping chunks with merged data.
// Samples from overlapped chunks are merged using series vertical merge func.
// It expects the same labels for each given series. // It expects the same labels for each given series.
func NewVerticalChunkSeriesMerger(chunkMerger VerticalChunksMergeFunc) VerticalChunkSeriesMergerFunc { //
// NOTE: Use this only when you see potentially overlapping series, as this introduces small overhead to handle overlaps
// between series.
func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalChunkSeriesMergeFunc {
return func(s ...ChunkSeries) ChunkSeries { return func(s ...ChunkSeries) ChunkSeries {
if len(s) == 0 { if len(s) == 0 {
return nil return nil
} }
return &verticalChunkSeriesMerger{ return &compactChunkSeriesMerger{
verticalChunksMerger: chunkMerger, mergeFunc: mergeFunc,
labels: s[0].Labels(), labels: s[0].Labels(),
series: s, series: s,
} }
} }
} }
func (s *verticalChunkSeriesMerger) Labels() labels.Labels { func (s *compactChunkSeriesMerger) Labels() labels.Labels {
return s.labels return s.labels
} }
func (s *verticalChunkSeriesMerger) Iterator() chunks.Iterator { func (s *compactChunkSeriesMerger) Iterator() chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(s.series)) iterators := make([]chunks.Iterator, 0, len(s.series))
for _, series := range s.series { for _, series := range s.series {
iterators = append(iterators, series.Iterator()) iterators = append(iterators, series.Iterator())
} }
return &chainChunkIterator{ return &compactChunkIterator{
overlappedChunksMerger: s.verticalChunksMerger, mergeFunc: s.mergeFunc,
labels: s.labels,
iterators: iterators, iterators: iterators,
h: nil,
} }
} }
// chainChunkIterator is responsible to chain chunks from different iterators of same time series. // compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries.
// If they are time overlapping overlappedChunksMerger will be used. // If time-overlapping chunks are found, they are encoded and passed to series merge and encoded again into one bigger chunk.
type chainChunkIterator struct { // TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
overlappedChunksMerger VerticalChunksMergeFunc type compactChunkIterator struct {
mergeFunc VerticalSeriesMergeFunc
labels labels.Labels
iterators []chunks.Iterator iterators []chunks.Iterator
h chunkIteratorHeap h chunkIteratorHeap
} }
func (c *chainChunkIterator) At() chunks.Meta { func (c *compactChunkIterator) At() chunks.Meta {
if len(c.h) == 0 { if len(c.h) == 0 {
panic("chainChunkIterator.At() called after .Next() returned false.") panic("compactChunkIterator.At() called after .Next() returned false.")
} }
return c.h[0].At() return c.h[0].At()
} }
func (c *chainChunkIterator) Next() bool { func (c *compactChunkIterator) Next() bool {
if c.h == nil { if c.h == nil {
for _, iter := range c.iterators { for _, iter := range c.iterators {
if iter.Next() { if iter.Next() {
heap.Push(&c.h, iter) heap.Push(&c.h, iter)
} }
} }
return len(c.h) > 0 return len(c.h) > 0
} }
@ -754,41 +787,63 @@ func (c *chainChunkIterator) Next() bool {
return false return false
} }
// Detect the shortest chain of time-overlapped chunks. // Detect overlaps to compact.
// Be smart about it and deduplicate on the fly if chunks are identical.
last := c.At() last := c.At()
var overlapped []chunks.Meta var overlapped []Series
for { for {
iter := heap.Pop(&c.h).(chunks.Iterator) iter := heap.Pop(&c.h).(chunks.Iterator)
if iter.Next() { if iter.Next() {
heap.Push(&c.h, iter) heap.Push(&c.h, iter)
} }
if len(c.h) == 0 { if len(c.h) == 0 {
break break
} }
// Get the current oldest chunk by min, then max time.
next := c.At() next := c.At()
if next.MinTime > last.MaxTime { if next.MinTime > last.MaxTime {
// No overlap with last one. // No overlap with last one.
break break
} }
overlapped = append(overlapped, last)
if next.MinTime == last.MinTime &&
next.MaxTime == last.MaxTime &&
bytes.Equal(next.Chunk.Bytes(), last.Chunk.Bytes()) {
// 1:1 duplicates, skip last.
continue
}
overlapped = append(overlapped, &chunkToSeriesDecoder{
labels: c.labels,
Meta: last,
})
last = next last = next
} }
if len(overlapped) > 0 {
heap.Push(&c.h, c.overlappedChunksMerger(append(overlapped, c.At())...)) if len(overlapped) == 0 {
return true
}
return len(c.h) > 0 return len(c.h) > 0
} }
func (c *chainChunkIterator) Err() error { // Add last, not yet included overlap.
overlapped = append(overlapped, &chunkToSeriesDecoder{
labels: c.labels,
Meta: c.At(),
})
var chkSeries ChunkSeries = &seriesToChunkEncoder{Series: c.mergeFunc(overlapped...)}
heap.Push(&c.h, chkSeries)
return true
}
func (c *compactChunkIterator) Err() error {
var errs tsdb_errors.MultiError
for _, iter := range c.iterators { for _, iter := range c.iterators {
if err := iter.Err(); err != nil { if err := iter.Err(); err != nil {
return err errs.Add(err)
} }
} }
return nil return errs.Err()
} }
type chunkIteratorHeap []chunks.Iterator type chunkIteratorHeap []chunks.Iterator

View File

@ -15,9 +15,9 @@ package storage
import ( import (
"context" "context"
"errors"
"testing" "testing"
"github.com/pkg/errors"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
@ -72,6 +72,7 @@ func TestSelectSorted(t *testing.T) {
fanoutStorage := storage.NewFanout(nil, priStorage, remoteStorage1, remoteStorage2) fanoutStorage := storage.NewFanout(nil, priStorage, remoteStorage1, remoteStorage2)
t.Run("querier", func(t *testing.T) {
querier, err := fanoutStorage.Querier(context.Background(), 0, 8000) querier, err := fanoutStorage.Querier(context.Background(), 0, 8000)
testutil.Ok(t, err) testutil.Ok(t, err)
defer querier.Close() defer querier.Close()
@ -94,9 +95,37 @@ func TestSelectSorted(t *testing.T) {
} }
} }
testutil.Equals(t, labelsResult, outputLabel)
testutil.Equals(t, inputTotalSize, len(result))
})
t.Run("chunk querier", func(t *testing.T) {
t.Skip("TODO(bwplotka: Unskip when db will implement ChunkQuerier.")
querier, err := fanoutStorage.ChunkQuerier(context.Background(), 0, 8000)
testutil.Ok(t, err)
defer querier.Close()
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
testutil.Ok(t, err)
seriesSet := storage.NewSeriesSetFromChunkSeriesSet(querier.Select(true, nil, matcher))
result := make(map[int64]float64)
var labelsResult labels.Labels
for seriesSet.Next() {
series := seriesSet.At()
seriesLabels := series.Labels()
labelsResult = seriesLabels
iterator := series.Iterator()
for iterator.Next() {
timestamp, value := iterator.At()
result[timestamp] = value
}
}
testutil.Ok(t, seriesSet.Err()) testutil.Ok(t, seriesSet.Err())
testutil.Equals(t, labelsResult, outputLabel) testutil.Equals(t, labelsResult, outputLabel)
testutil.Equals(t, inputTotalSize, len(result)) testutil.Equals(t, inputTotalSize, len(result))
})
} }
func TestFanoutErrors(t *testing.T) { func TestFanoutErrors(t *testing.T) {
@ -106,19 +135,19 @@ func TestFanoutErrors(t *testing.T) {
cases := []struct { cases := []struct {
primary storage.Storage primary storage.Storage
secondary storage.Storage secondary storage.Storage
warnings storage.Warnings warning error
err error err error
}{ }{
{ {
primary: workingStorage, primary: workingStorage,
secondary: errStorage{}, secondary: errStorage{},
warnings: storage.Warnings{errSelect}, warning: errSelect,
err: nil, err: nil,
}, },
{ {
primary: errStorage{}, primary: errStorage{},
secondary: workingStorage, secondary: workingStorage,
warnings: nil, warning: nil,
err: errSelect, err: errSelect,
}, },
} }
@ -126,17 +155,55 @@ func TestFanoutErrors(t *testing.T) {
for _, tc := range cases { for _, tc := range cases {
fanoutStorage := storage.NewFanout(nil, tc.primary, tc.secondary) fanoutStorage := storage.NewFanout(nil, tc.primary, tc.secondary)
t.Run("samples", func(t *testing.T) {
querier, err := fanoutStorage.Querier(context.Background(), 0, 8000) querier, err := fanoutStorage.Querier(context.Background(), 0, 8000)
testutil.Ok(t, err) testutil.Ok(t, err)
defer querier.Close() defer querier.Close()
matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b") matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b")
ss := querier.Select(true, nil, matcher) ss := querier.Select(true, nil, matcher)
// Exhaust.
for ss.Next() { for ss.Next() {
ss.At() ss.At()
} }
testutil.Equals(t, tc.err, ss.Err())
testutil.Equals(t, tc.warnings, ss.Warnings()) if tc.err != nil {
testutil.NotOk(t, ss.Err())
testutil.Equals(t, tc.err.Error(), ss.Err().Error())
}
if tc.warning != nil {
testutil.Assert(t, len(ss.Warnings()) > 0, "warnings expected")
testutil.NotOk(t, ss.Warnings()[0])
testutil.Equals(t, tc.warning.Error(), ss.Warnings()[0].Error())
}
})
t.Run("chunks", func(t *testing.T) {
t.Skip("enable once TestStorage and TSDB implements ChunkQuerier")
querier, err := fanoutStorage.ChunkQuerier(context.Background(), 0, 8000)
testutil.Ok(t, err)
defer querier.Close()
matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b")
ss := querier.Select(true, nil, matcher)
// Exhaust.
for ss.Next() {
ss.At()
}
if tc.err != nil {
testutil.NotOk(t, ss.Err())
testutil.Equals(t, tc.err.Error(), ss.Err().Error())
}
if tc.warning != nil {
testutil.Assert(t, len(ss.Warnings()) > 0, "warnings expected")
testutil.NotOk(t, ss.Warnings()[0])
testutil.Equals(t, tc.warning.Error(), ss.Warnings()[0].Error())
}
})
} }
} }
@ -144,23 +211,20 @@ var errSelect = errors.New("select error")
type errStorage struct{} type errStorage struct{}
type errQuerier struct{}
func (errStorage) Querier(_ context.Context, _, _ int64) (storage.Querier, error) { func (errStorage) Querier(_ context.Context, _, _ int64) (storage.Querier, error) {
return errQuerier{}, nil return errQuerier{}, nil
} }
func (errStorage) Appender() storage.Appender { type errChunkQuerier struct{ errQuerier }
return nil
}
func (errStorage) StartTime() (int64, error) { func (errStorage) ChunkQuerier(_ context.Context, _, _ int64) (storage.ChunkQuerier, error) {
return 0, nil return errChunkQuerier{}, nil
} }
func (errStorage) Appender() storage.Appender { return nil }
func (errStorage) Close() error { func (errStorage) StartTime() (int64, error) { return 0, nil }
return nil func (errStorage) Close() error { return nil }
}
type errQuerier struct{}
func (errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { func (errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
return storage.ErrSeriesSet(errSelect) return storage.ErrSeriesSet(errSelect)
@ -174,6 +238,8 @@ func (errQuerier) LabelNames() ([]string, storage.Warnings, error) {
return nil, nil, errors.New("label names error") return nil, nil, errors.New("label names error")
} }
func (errQuerier) Close() error { func (errQuerier) Close() error { return nil }
return nil
func (errChunkQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.ChunkSeriesSet {
return storage.ErrChunkSeriesSet(errSelect)
} }

View File

@ -283,7 +283,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
), ),
}, },
{ {
name: "two queriers, one different series each", name: "two secondaries, one different series each",
chkQuerierSeries: [][]ChunkSeries{{ chkQuerierSeries: [][]ChunkSeries{{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
}, { }, {
@ -295,7 +295,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
), ),
}, },
{ {
name: "two queriers, two not in time order series each", name: "two secondaries, two not in time order series each",
chkQuerierSeries: [][]ChunkSeries{{ chkQuerierSeries: [][]ChunkSeries{{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}), NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}), NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
@ -319,7 +319,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
), ),
}, },
{ {
name: "five queriers, only two have two not in time order series each", name: "five secondaries, only two have two not in time order series each",
chkQuerierSeries: [][]ChunkSeries{{}, {}, { chkQuerierSeries: [][]ChunkSeries{{}, {}, {
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}), NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}), NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
@ -343,7 +343,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
), ),
}, },
{ {
name: "two queriers, with two not in time order series each, with 3 noop queries and one nil together", name: "two secondaries, with two not in time order series each, with 3 noop queries and one nil together",
chkQuerierSeries: [][]ChunkSeries{{ chkQuerierSeries: [][]ChunkSeries{{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}), NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}), NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
@ -391,8 +391,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
} }
qs = append(qs, tc.extraQueriers...) qs = append(qs, tc.extraQueriers...)
// TODO(bwplotka): Add case of overlap to check if those are handled well. merged := NewMergeChunkQuerier(p, qs, NewCompactingChunkSeriesMerger(nil)).Select(false, nil)
merged := NewMergeChunkQuerier(p, qs, NewVerticalChunkSeriesMerger(nil)).Select(false, nil)
for merged.Next() { for merged.Next() {
testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true") testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true")
actualSeries := merged.At() actualSeries := merged.At()
@ -412,7 +411,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
} }
type mockQuerier struct { type mockQuerier struct {
baseQuerier LabelQuerier
toReturn []Series toReturn []Series
} }
@ -434,7 +433,7 @@ func (m *mockQuerier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Match
} }
type mockChunkQurier struct { type mockChunkQurier struct {
baseQuerier LabelQuerier
toReturn []ChunkSeries toReturn []ChunkSeries
} }
@ -510,22 +509,22 @@ func TestChainSampleIterator(t *testing.T) {
}{ }{
{ {
input: []chunkenc.Iterator{ input: []chunkenc.Iterator{
NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}}), NewListSeriesIterator(samples{sample{0, 0}, sample{1, 1}}),
}, },
expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}},
}, },
{ {
input: []chunkenc.Iterator{ input: []chunkenc.Iterator{
NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}}), NewListSeriesIterator(samples{sample{0, 0}, sample{1, 1}}),
NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{3, 3}}), NewListSeriesIterator(samples{sample{2, 2}, sample{3, 3}}),
}, },
expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}}, expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}},
}, },
{ {
input: []chunkenc.Iterator{ input: []chunkenc.Iterator{
NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{3, 3}}), NewListSeriesIterator(samples{sample{0, 0}, sample{3, 3}}),
NewListSeriesIterator([]tsdbutil.Sample{sample{1, 1}, sample{4, 4}}), NewListSeriesIterator(samples{sample{1, 1}, sample{4, 4}}),
NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{5, 5}}), NewListSeriesIterator(samples{sample{2, 2}, sample{5, 5}}),
}, },
expected: []tsdbutil.Sample{ expected: []tsdbutil.Sample{
sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}}, sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}},
@ -533,12 +532,12 @@ func TestChainSampleIterator(t *testing.T) {
// Overlap. // Overlap.
{ {
input: []chunkenc.Iterator{ input: []chunkenc.Iterator{
NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}}), NewListSeriesIterator(samples{sample{0, 0}, sample{1, 1}}),
NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{2, 2}}), NewListSeriesIterator(samples{sample{0, 0}, sample{2, 2}}),
NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{3, 3}}), NewListSeriesIterator(samples{sample{2, 2}, sample{3, 3}}),
NewListSeriesIterator([]tsdbutil.Sample{}), NewListSeriesIterator(samples{}),
NewListSeriesIterator([]tsdbutil.Sample{}), NewListSeriesIterator(samples{}),
NewListSeriesIterator([]tsdbutil.Sample{}), NewListSeriesIterator(samples{}),
}, },
expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}}, expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}},
}, },
@ -558,24 +557,24 @@ func TestChainSampleIteratorSeek(t *testing.T) {
}{ }{
{ {
input: []chunkenc.Iterator{ input: []chunkenc.Iterator{
NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}), NewListSeriesIterator(samples{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
}, },
seek: 1, seek: 1,
expected: []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, expected: []tsdbutil.Sample{sample{1, 1}, sample{2, 2}},
}, },
{ {
input: []chunkenc.Iterator{ input: []chunkenc.Iterator{
NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}}), NewListSeriesIterator(samples{sample{0, 0}, sample{1, 1}}),
NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{3, 3}}), NewListSeriesIterator(samples{sample{2, 2}, sample{3, 3}}),
}, },
seek: 2, seek: 2,
expected: []tsdbutil.Sample{sample{2, 2}, sample{3, 3}}, expected: []tsdbutil.Sample{sample{2, 2}, sample{3, 3}},
}, },
{ {
input: []chunkenc.Iterator{ input: []chunkenc.Iterator{
NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{3, 3}}), NewListSeriesIterator(samples{sample{0, 0}, sample{3, 3}}),
NewListSeriesIterator([]tsdbutil.Sample{sample{1, 1}, sample{4, 4}}), NewListSeriesIterator(samples{sample{1, 1}, sample{4, 4}}),
NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{5, 5}}), NewListSeriesIterator(samples{sample{2, 2}, sample{5, 5}}),
}, },
seek: 2, seek: 2,
expected: []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}}, expected: []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}},

View File

@ -19,7 +19,7 @@ package storage
import "github.com/prometheus/prometheus/pkg/labels" import "github.com/prometheus/prometheus/pkg/labels"
type genericQuerier interface { type genericQuerier interface {
baseQuerier LabelQuerier
Select(bool, *SelectHints, ...*labels.Matcher) genericSeriesSet Select(bool, *SelectHints, ...*labels.Matcher) genericSeriesSet
} }
@ -49,7 +49,7 @@ func (a *genericChunkSeriesSetAdapter) At() Labels {
} }
type genericQuerierAdapter struct { type genericQuerierAdapter struct {
baseQuerier LabelQuerier
// One-of. If both are set, Querier will be used. // One-of. If both are set, Querier will be used.
q Querier q Querier
@ -64,11 +64,11 @@ func (q *genericQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matc
} }
func newGenericQuerierFrom(q Querier) genericQuerier { func newGenericQuerierFrom(q Querier) genericQuerier {
return &genericQuerierAdapter{baseQuerier: q, q: q} return &genericQuerierAdapter{LabelQuerier: q, q: q}
} }
func newGenericQuerierFromChunk(cq ChunkQuerier) genericQuerier { func newGenericQuerierFromChunk(cq ChunkQuerier) genericQuerier {
return &genericQuerierAdapter{baseQuerier: cq, cq: cq} return &genericQuerierAdapter{LabelQuerier: cq, cq: cq}
} }
type querierAdapter struct { type querierAdapter struct {
@ -116,7 +116,7 @@ func (a *seriesMergerAdapter) Merge(s ...Labels) Labels {
} }
type chunkSeriesMergerAdapter struct { type chunkSeriesMergerAdapter struct {
VerticalChunkSeriesMergerFunc VerticalChunkSeriesMergeFunc
} }
func (a *chunkSeriesMergerAdapter) Merge(s ...Labels) Labels { func (a *chunkSeriesMergerAdapter) Merge(s ...Labels) Labels {
@ -124,7 +124,7 @@ func (a *chunkSeriesMergerAdapter) Merge(s ...Labels) Labels {
for _, ser := range s { for _, ser := range s {
buf = append(buf, ser.(ChunkSeries)) buf = append(buf, ser.(ChunkSeries))
} }
return a.VerticalChunkSeriesMergerFunc(buf...) return a.VerticalChunkSeriesMergeFunc(buf...)
} }
type noopGenericSeriesSet struct{} type noopGenericSeriesSet struct{}

View File

@ -37,11 +37,16 @@ type Appendable interface {
Appender() Appender Appender() Appender
} }
// SampleAndChunkQueryable allows retrieving samples as well as encoded samples in form of chunks.
type SampleAndChunkQueryable interface {
Queryable
ChunkQueryable
}
// Storage ingests and manages samples, along with various indexes. All methods // Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. Storage implements storage.SampleAppender. // are goroutine-safe. Storage implements storage.SampleAppender.
// TODO(bwplotka): Add ChunkQueryable to Storage in next PR.
type Storage interface { type Storage interface {
Queryable SampleAndChunkQueryable
Appendable Appendable
// StartTime returns the oldest timestamp stored in the storage. // StartTime returns the oldest timestamp stored in the storage.
@ -60,7 +65,7 @@ type Queryable interface {
// Querier provides querying access over time series data of a fixed time range. // Querier provides querying access over time series data of a fixed time range.
type Querier interface { type Querier interface {
baseQuerier LabelQuerier
// Select returns a set of series that matches the given label matchers. // Select returns a set of series that matches the given label matchers.
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance. // Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
@ -72,12 +77,12 @@ type Querier interface {
// Use it when you need to have access to samples in encoded format. // Use it when you need to have access to samples in encoded format.
type ChunkQueryable interface { type ChunkQueryable interface {
// ChunkQuerier returns a new ChunkQuerier on the storage. // ChunkQuerier returns a new ChunkQuerier on the storage.
ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQuerier, Warnings, error) ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQuerier, error)
} }
// ChunkQuerier provides querying access over time series data of a fixed time range. // ChunkQuerier provides querying access over time series data of a fixed time range.
type ChunkQuerier interface { type ChunkQuerier interface {
baseQuerier LabelQuerier
// Select returns a set of series that matches the given label matchers. // Select returns a set of series that matches the given label matchers.
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance. // Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
@ -85,7 +90,8 @@ type ChunkQuerier interface {
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) ChunkSeriesSet Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) ChunkSeriesSet
} }
type baseQuerier interface { // LabelQuerier provides querying access over labels.
type LabelQuerier interface {
// LabelValues returns all potential values for a label name. // LabelValues returns all potential values for a label name.
// It is not safe to use the strings beyond the lifefime of the querier. // It is not safe to use the strings beyond the lifefime of the querier.
LabelValues(name string) ([]string, Warnings, error) LabelValues(name string) ([]string, Warnings, error)
@ -111,6 +117,7 @@ type SelectHints struct {
Range int64 // Range vector selector range in milliseconds. Range int64 // Range vector selector range in milliseconds.
} }
// TODO(bwplotka): Move to promql/engine_test.go?
// QueryableFunc is an adapter to allow the use of ordinary functions as // QueryableFunc is an adapter to allow the use of ordinary functions as
// Queryables. It follows the idea of http.HandlerFunc. // Queryables. It follows the idea of http.HandlerFunc.
type QueryableFunc func(ctx context.Context, mint, maxt int64) (Querier, error) type QueryableFunc func(ctx context.Context, mint, maxt int64) (Querier, error)
@ -169,20 +176,40 @@ func EmptySeriesSet() SeriesSet {
} }
type errSeriesSet struct { type errSeriesSet struct {
ws Warnings
err error err error
} }
func (s errSeriesSet) Next() bool { return false } func (s errSeriesSet) Next() bool { return false }
func (s errSeriesSet) At() Series { return nil } func (s errSeriesSet) At() Series { return nil }
func (s errSeriesSet) Err() error { return s.err } func (s errSeriesSet) Err() error { return s.err }
func (s errSeriesSet) Warnings() Warnings { return s.ws } func (s errSeriesSet) Warnings() Warnings { return nil }
// ErrSeriesSet returns a series set that wraps an error. // ErrSeriesSet returns a series set that wraps an error.
func ErrSeriesSet(err error) SeriesSet { func ErrSeriesSet(err error) SeriesSet {
return errSeriesSet{err: err} return errSeriesSet{err: err}
} }
var emptyChunkSeriesSet = errChunkSeriesSet{}
// EmptyChunkSeriesSet returns a chunk series set that's always empty.
func EmptyChunkSeriesSet() ChunkSeriesSet {
return emptyChunkSeriesSet
}
type errChunkSeriesSet struct {
err error
}
func (s errChunkSeriesSet) Next() bool { return false }
func (s errChunkSeriesSet) At() ChunkSeries { return nil }
func (s errChunkSeriesSet) Err() error { return s.err }
func (s errChunkSeriesSet) Warnings() Warnings { return nil }
// ErrChunkSeriesSet returns a chunk series set that wraps an error.
func ErrChunkSeriesSet(err error) ChunkSeriesSet {
return errChunkSeriesSet{err: err}
}
// Series exposes a single time series and allows iterating over samples. // Series exposes a single time series and allows iterating over samples.
type Series interface { type Series interface {
Labels Labels

View File

@ -42,7 +42,8 @@ const maxErrMsgLen = 256
var userAgent = fmt.Sprintf("Prometheus/%s", version.Version) var userAgent = fmt.Sprintf("Prometheus/%s", version.Version)
var remoteReadQueriesTotal = prometheus.NewCounterVec( var (
remoteReadQueriesTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
@ -51,28 +52,76 @@ var remoteReadQueriesTotal = prometheus.NewCounterVec(
}, },
[]string{remoteName, endpoint, "code"}, []string{remoteName, endpoint, "code"},
) )
remoteReadQueries = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "remote_read_queries",
Help: "The number of in-flight remote read queries.",
},
[]string{remoteName, endpoint},
)
remoteReadQueryDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "read_request_duration_seconds",
Help: "Histogram of the latency for remote read requests.",
Buckets: append(prometheus.DefBuckets, 25, 60),
},
[]string{remoteName, endpoint},
)
)
// Client allows reading and writing from/to a remote HTTP endpoint. func init() {
type Client struct { prometheus.MustRegister(remoteReadQueriesTotal, remoteReadQueries, remoteReadQueryDuration)
}
// client allows reading and writing from/to a remote HTTP endpoint.
type client struct {
remoteName string // Used to differentiate clients in metrics. remoteName string // Used to differentiate clients in metrics.
url *config_util.URL url *config_util.URL
client *http.Client client *http.Client
timeout time.Duration timeout time.Duration
readQueries prometheus.Gauge
readQueriesTotal *prometheus.CounterVec
readQueriesDuration prometheus.Observer
} }
// ClientConfig configures a Client. // ClientConfig configures a client.
type ClientConfig struct { type ClientConfig struct {
URL *config_util.URL URL *config_util.URL
Timeout model.Duration Timeout model.Duration
HTTPClientConfig config_util.HTTPClientConfig HTTPClientConfig config_util.HTTPClientConfig
} }
func init() { // ReadClient uses the SAMPLES method of remote read to read series samples from remote server.
prometheus.MustRegister(remoteReadQueriesTotal) // TODO(bwplotka): Add streamed chunked remote read method as well (https://github.com/prometheus/prometheus/issues/5926).
type ReadClient interface {
Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error)
} }
// NewClient creates a new Client. // newReadClient creates a new client for remote read.
func NewClient(remoteName string, conf *ClientConfig) (*Client, error) { func newReadClient(name string, conf *ClientConfig) (ReadClient, error) {
httpClient, err := config_util.NewClientFromConfig(conf.HTTPClientConfig, "remote_storage", false)
if err != nil {
return nil, err
}
return &client{
remoteName: name,
url: conf.URL,
client: httpClient,
timeout: time.Duration(conf.Timeout),
readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()),
readQueriesTotal: remoteReadQueriesTotal.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}),
readQueriesDuration: remoteReadQueryDuration.WithLabelValues(name, conf.URL.String()),
}, nil
}
// NewWriteClient creates a new client for remote write.
func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) {
httpClient, err := config_util.NewClientFromConfig(conf.HTTPClientConfig, "remote_storage", false) httpClient, err := config_util.NewClientFromConfig(conf.HTTPClientConfig, "remote_storage", false)
if err != nil { if err != nil {
return nil, err return nil, err
@ -83,8 +132,8 @@ func NewClient(remoteName string, conf *ClientConfig) (*Client, error) {
RoundTripper: t, RoundTripper: t,
} }
return &Client{ return &client{
remoteName: remoteName, remoteName: name,
url: conf.URL, url: conf.URL,
client: httpClient, client: httpClient,
timeout: time.Duration(conf.Timeout), timeout: time.Duration(conf.Timeout),
@ -97,7 +146,7 @@ type recoverableError struct {
// Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled // Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled
// and encoded bytes from codec.go. // and encoded bytes from codec.go.
func (c *Client) Store(ctx context.Context, req []byte) error { func (c *client) Store(ctx context.Context, req []byte) error {
httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(req)) httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(req))
if err != nil { if err != nil {
// Errors from NewRequest are from unparsable URLs, so are not // Errors from NewRequest are from unparsable URLs, so are not
@ -150,17 +199,20 @@ func (c *Client) Store(ctx context.Context, req []byte) error {
} }
// Name uniquely identifies the client. // Name uniquely identifies the client.
func (c Client) Name() string { func (c client) Name() string {
return c.remoteName return c.remoteName
} }
// Endpoint is the remote read or write endpoint. // Endpoint is the remote read or write endpoint.
func (c Client) Endpoint() string { func (c client) Endpoint() string {
return c.url.String() return c.url.String()
} }
// Read reads from a remote endpoint. // Read reads from a remote endpoint.
func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) { func (c *client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
c.readQueries.Inc()
defer c.readQueries.Dec()
req := &prompb.ReadRequest{ req := &prompb.ReadRequest{
// TODO: Support batching multiple queries into one read request, // TODO: Support batching multiple queries into one read request,
// as the protobuf interface allows for it. // as the protobuf interface allows for it.
@ -200,6 +252,7 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe
defer ht.Finish() defer ht.Finish()
} }
start := time.Now()
httpResp, err := c.client.Do(httpReq) httpResp, err := c.client.Do(httpReq)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "error sending request") return nil, errors.Wrap(err, "error sending request")
@ -208,9 +261,8 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe
io.Copy(ioutil.Discard, httpResp.Body) io.Copy(ioutil.Discard, httpResp.Body)
httpResp.Body.Close() httpResp.Body.Close()
}() }()
c.readQueriesDuration.Observe(time.Since(start).Seconds())
remoteReadTotalCounter := remoteReadQueriesTotal.WithLabelValues(c.remoteName, c.url.String(), strconv.Itoa(httpResp.StatusCode)) c.readQueriesTotal.WithLabelValues(strconv.Itoa(httpResp.StatusCode)).Inc()
remoteReadTotalCounter.Inc()
compressed, err = ioutil.ReadAll(httpResp.Body) compressed, err = ioutil.ReadAll(httpResp.Body)
if err != nil { if err != nil {

View File

@ -25,7 +25,6 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
config_util "github.com/prometheus/common/config" config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
) )
@ -71,7 +70,7 @@ func TestStoreHTTPErrorHandling(t *testing.T) {
hash, err := toHash(conf) hash, err := toHash(conf)
testutil.Ok(t, err) testutil.Ok(t, err)
c, err := NewClient(hash, conf) c, err := NewWriteClient(hash, conf)
testutil.Ok(t, err) testutil.Ok(t, err)
err = c.Store(context.Background(), []byte{}) err = c.Store(context.Background(), []byte{})

View File

@ -138,25 +138,18 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
Samples: samples, Samples: samples,
}) })
} }
if err := ss.Err(); err != nil { return resp, ss.Warnings(), ss.Err()
return nil, ss.Warnings(), err
}
return resp, ss.Warnings(), nil
} }
// FromQueryResult unpacks and sorts a QueryResult proto. // FromQueryResult unpacks and sorts a QueryResult proto.
func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet { func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet {
series := make([]storage.Series, 0, len(res.Timeseries)) series := make([]storage.Series, 0, len(res.Timeseries))
for _, ts := range res.Timeseries { for _, ts := range res.Timeseries {
labels := labelProtosToLabels(ts.Labels) lbls := labelProtosToLabels(ts.Labels)
if err := validateLabelsAndMetricName(labels); err != nil { if err := validateLabelsAndMetricName(lbls); err != nil {
return errSeriesSet{err: err} return errSeriesSet{err: err}
} }
series = append(series, &concreteSeries{labels: lbls, samples: ts.Samples})
series = append(series, &concreteSeries{
labels: labels,
samples: ts.Samples,
})
} }
if sortSeries { if sortSeries {
@ -187,9 +180,8 @@ func NegotiateResponseType(accepted []prompb.ReadRequest_ResponseType) (prompb.R
return 0, errors.Errorf("server does not support any of the requested response types: %v; supported: %v", accepted, supported) return 0, errors.Errorf("server does not support any of the requested response types: %v; supported: %v", accepted, supported)
} }
// StreamChunkedReadResponses iterates over series, builds chunks and streams those to the caller. // TODO(bwlpotka): Remove when tsdb will support ChunkQuerier.
// TODO(bwplotka): Encode only what's needed. Fetch the encoded series from blocks instead of re-encoding everything. func DeprecatedStreamChunkedReadResponses(
func StreamChunkedReadResponses(
stream io.Writer, stream io.Writer,
queryIndex int64, queryIndex int64,
ss storage.SeriesSet, ss storage.SeriesSet,
@ -315,6 +307,77 @@ func encodeChunks(iter chunkenc.Iterator, chks []prompb.Chunk, frameBytesLeft in
return chks, nil return chks, nil
} }
// StreamChunkedReadResponses iterates over series, builds chunks and streams those to the caller.
// It expects Series set with populated chunks.
func StreamChunkedReadResponses(
stream io.Writer,
queryIndex int64,
ss storage.ChunkSeriesSet,
sortedExternalLabels []prompb.Label,
maxBytesInFrame int,
) (storage.Warnings, error) {
var (
chks []prompb.Chunk
lbls []prompb.Label
)
for ss.Next() {
series := ss.At()
iter := series.Iterator()
lbls = MergeLabels(labelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels)
frameBytesLeft := maxBytesInFrame
for _, lbl := range lbls {
frameBytesLeft -= lbl.Size()
}
isNext := iter.Next()
// Send at most one series per frame; series may be split over multiple frames according to maxBytesInFrame.
for isNext {
chk := iter.At()
if chk.Chunk == nil {
return ss.Warnings(), errors.Errorf("StreamChunkedReadResponses: found not populated chunk returned by SeriesSet at ref: %v", chk.Ref)
}
// Cut the chunk.
chks = append(chks, prompb.Chunk{
MinTimeMs: chk.MinTime,
MaxTimeMs: chk.MaxTime,
Type: prompb.Chunk_Encoding(chk.Chunk.Encoding()),
Data: chk.Chunk.Bytes(),
})
frameBytesLeft -= chks[len(chks)-1].Size()
// We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
isNext = iter.Next()
if frameBytesLeft > 0 && isNext {
continue
}
b, err := proto.Marshal(&prompb.ChunkedReadResponse{
ChunkedSeries: []*prompb.ChunkedSeries{
{Labels: lbls, Chunks: chks},
},
QueryIndex: queryIndex,
})
if err != nil {
return ss.Warnings(), errors.Wrap(err, "marshal ChunkedReadResponse")
}
if _, err := stream.Write(b); err != nil {
return ss.Warnings(), errors.Wrap(err, "write to stream")
}
chks = chks[:0]
}
if err := iter.Err(); err != nil {
return ss.Warnings(), err
}
}
return ss.Warnings(), ss.Err()
}
// MergeLabels merges two sets of sorted proto labels, preferring those in // MergeLabels merges two sets of sorted proto labels, preferring those in
// primary to those in secondary when there is an overlap. // primary to those in secondary when there is an overlap.
func MergeLabels(primary, secondary []prompb.Label) []prompb.Label { func MergeLabels(primary, secondary []prompb.Label) []prompb.Label {

View File

@ -217,9 +217,9 @@ func (m *queueManagerMetrics) unregister() {
} }
} }
// StorageClient defines an interface for sending a batch of samples to an // WriteClient defines an interface for sending a batch of samples to an
// external timeseries database. // external timeseries database.
type StorageClient interface { type WriteClient interface {
// Store stores the given samples in the remote storage. // Store stores the given samples in the remote storage.
Store(context.Context, []byte) error Store(context.Context, []byte) error
// Name uniquely identifies the remote storage. // Name uniquely identifies the remote storage.
@ -229,7 +229,7 @@ type StorageClient interface {
} }
// QueueManager manages a queue of samples to be sent to the Storage // QueueManager manages a queue of samples to be sent to the Storage
// indicated by the provided StorageClient. Implements writeTo interface // indicated by the provided WriteClient. Implements writeTo interface
// used by WAL Watcher. // used by WAL Watcher.
type QueueManager struct { type QueueManager struct {
// https://golang.org/pkg/sync/atomic/#pkg-note-BUG // https://golang.org/pkg/sync/atomic/#pkg-note-BUG
@ -243,7 +243,7 @@ type QueueManager struct {
watcher *wal.Watcher watcher *wal.Watcher
clientMtx sync.RWMutex clientMtx sync.RWMutex
storeClient StorageClient storeClient WriteClient
seriesMtx sync.Mutex seriesMtx sync.Mutex
seriesLabels map[uint64]labels.Labels seriesLabels map[uint64]labels.Labels
@ -272,7 +272,7 @@ func NewQueueManager(
cfg config.QueueConfig, cfg config.QueueConfig,
externalLabels labels.Labels, externalLabels labels.Labels,
relabelConfigs []*relabel.Config, relabelConfigs []*relabel.Config,
client StorageClient, client WriteClient,
flushDeadline time.Duration, flushDeadline time.Duration,
) *QueueManager { ) *QueueManager {
if logger == nil { if logger == nil {
@ -440,13 +440,13 @@ func (t *QueueManager) SeriesReset(index int) {
// SetClient updates the client used by a queue. Used when only client specific // SetClient updates the client used by a queue. Used when only client specific
// fields are updated to avoid restarting the queue. // fields are updated to avoid restarting the queue.
func (t *QueueManager) SetClient(c StorageClient) { func (t *QueueManager) SetClient(c WriteClient) {
t.clientMtx.Lock() t.clientMtx.Lock()
t.storeClient = c t.storeClient = c
t.clientMtx.Unlock() t.clientMtx.Unlock()
} }
func (t *QueueManager) client() StorageClient { func (t *QueueManager) client() WriteClient {
t.clientMtx.RLock() t.clientMtx.RLock()
defer t.clientMtx.RUnlock() defer t.clientMtx.RUnlock()
return t.storeClient return t.storeClient

View File

@ -51,7 +51,7 @@ func TestSampleDelivery(t *testing.T) {
n := config.DefaultQueueConfig.MaxSamplesPerSend * 2 n := config.DefaultQueueConfig.MaxSamplesPerSend * 2
samples, series := createTimeseries(n, n) samples, series := createTimeseries(n, n)
c := NewTestStorageClient() c := NewTestWriteClient()
c.expectSamples(samples[:len(samples)/2], series) c.expectSamples(samples[:len(samples)/2], series)
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
@ -81,7 +81,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
// Let's send one less sample than batch size, and wait the timeout duration // Let's send one less sample than batch size, and wait the timeout duration
n := 9 n := 9
samples, series := createTimeseries(n, n) samples, series := createTimeseries(n, n)
c := NewTestStorageClient() c := NewTestWriteClient()
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
cfg.MaxShards = 1 cfg.MaxShards = 1
@ -125,7 +125,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
}) })
} }
c := NewTestStorageClient() c := NewTestWriteClient()
c.expectSamples(samples, series) c.expectSamples(samples, series)
dir, err := ioutil.TempDir("", "TestSampleDeliveryOrder") dir, err := ioutil.TempDir("", "TestSampleDeliveryOrder")
@ -145,7 +145,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
func TestShutdown(t *testing.T) { func TestShutdown(t *testing.T) {
deadline := 1 * time.Second deadline := 1 * time.Second
c := NewTestBlockedStorageClient() c := NewTestBlockedWriteClient()
dir, err := ioutil.TempDir("", "TestShutdown") dir, err := ioutil.TempDir("", "TestShutdown")
testutil.Ok(t, err) testutil.Ok(t, err)
@ -181,7 +181,7 @@ func TestShutdown(t *testing.T) {
} }
func TestSeriesReset(t *testing.T) { func TestSeriesReset(t *testing.T) {
c := NewTestBlockedStorageClient() c := NewTestBlockedWriteClient()
deadline := 5 * time.Second deadline := 5 * time.Second
numSegments := 4 numSegments := 4
numSeries := 25 numSeries := 25
@ -210,7 +210,7 @@ func TestReshard(t *testing.T) {
nSamples := config.DefaultQueueConfig.Capacity * size nSamples := config.DefaultQueueConfig.Capacity * size
samples, series := createTimeseries(nSamples, nSeries) samples, series := createTimeseries(nSamples, nSeries)
c := NewTestStorageClient() c := NewTestWriteClient()
c.expectSamples(samples, series) c.expectSamples(samples, series)
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
@ -245,7 +245,7 @@ func TestReshard(t *testing.T) {
} }
func TestReshardRaceWithStop(t *testing.T) { func TestReshardRaceWithStop(t *testing.T) {
c := NewTestStorageClient() c := NewTestWriteClient()
var m *QueueManager var m *QueueManager
h := sync.Mutex{} h := sync.Mutex{}
@ -271,7 +271,7 @@ func TestReshardRaceWithStop(t *testing.T) {
func TestReleaseNoninternedString(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestStorageClient() c := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
m.Start() m.Start()
@ -319,7 +319,7 @@ func TestShouldReshard(t *testing.T) {
} }
for _, c := range cases { for _, c := range cases {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
client := NewTestStorageClient() client := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline)
m.numShards = c.startingShards m.numShards = c.startingShards
m.samplesIn.incr(c.samplesIn) m.samplesIn.incr(c.samplesIn)
@ -367,7 +367,7 @@ func getSeriesNameFromRef(r record.RefSeries) string {
return "" return ""
} }
type TestStorageClient struct { type TestWriteClient struct {
receivedSamples map[string][]prompb.Sample receivedSamples map[string][]prompb.Sample
expectedSamples map[string][]prompb.Sample expectedSamples map[string][]prompb.Sample
withWaitGroup bool withWaitGroup bool
@ -376,15 +376,15 @@ type TestStorageClient struct {
buf []byte buf []byte
} }
func NewTestStorageClient() *TestStorageClient { func NewTestWriteClient() *TestWriteClient {
return &TestStorageClient{ return &TestWriteClient{
withWaitGroup: true, withWaitGroup: true,
receivedSamples: map[string][]prompb.Sample{}, receivedSamples: map[string][]prompb.Sample{},
expectedSamples: map[string][]prompb.Sample{}, expectedSamples: map[string][]prompb.Sample{},
} }
} }
func (c *TestStorageClient) expectSamples(ss []record.RefSample, series []record.RefSeries) { func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) {
if !c.withWaitGroup { if !c.withWaitGroup {
return return
} }
@ -404,7 +404,7 @@ func (c *TestStorageClient) expectSamples(ss []record.RefSample, series []record
c.wg.Add(len(ss)) c.wg.Add(len(ss))
} }
func (c *TestStorageClient) waitForExpectedSamples(tb testing.TB) { func (c *TestWriteClient) waitForExpectedSamples(tb testing.TB) {
if !c.withWaitGroup { if !c.withWaitGroup {
return return
} }
@ -418,7 +418,7 @@ func (c *TestStorageClient) waitForExpectedSamples(tb testing.TB) {
} }
} }
func (c *TestStorageClient) expectSampleCount(numSamples int) { func (c *TestWriteClient) expectSampleCount(numSamples int) {
if !c.withWaitGroup { if !c.withWaitGroup {
return return
} }
@ -427,14 +427,14 @@ func (c *TestStorageClient) expectSampleCount(numSamples int) {
c.wg.Add(numSamples) c.wg.Add(numSamples)
} }
func (c *TestStorageClient) waitForExpectedSampleCount() { func (c *TestWriteClient) waitForExpectedSampleCount() {
if !c.withWaitGroup { if !c.withWaitGroup {
return return
} }
c.wg.Wait() c.wg.Wait()
} }
func (c *TestStorageClient) Store(_ context.Context, req []byte) error { func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
c.mtx.Lock() c.mtx.Lock()
defer c.mtx.Unlock() defer c.mtx.Unlock()
// nil buffers are ok for snappy, ignore cast error. // nil buffers are ok for snappy, ignore cast error.
@ -472,41 +472,41 @@ func (c *TestStorageClient) Store(_ context.Context, req []byte) error {
return nil return nil
} }
func (c *TestStorageClient) Name() string { func (c *TestWriteClient) Name() string {
return "teststorageclient" return "testwriteclient"
} }
func (c *TestStorageClient) Endpoint() string { func (c *TestWriteClient) Endpoint() string {
return "http://test-remote.com/1234" return "http://test-remote.com/1234"
} }
// TestBlockingStorageClient is a queue_manager StorageClient which will block // TestBlockingWriteClient is a queue_manager WriteClient which will block
// on any calls to Store(), until the request's Context is cancelled, at which // on any calls to Store(), until the request's Context is cancelled, at which
// point the `numCalls` property will contain a count of how many times Store() // point the `numCalls` property will contain a count of how many times Store()
// was called. // was called.
type TestBlockingStorageClient struct { type TestBlockingWriteClient struct {
numCalls uint64 numCalls uint64
} }
func NewTestBlockedStorageClient() *TestBlockingStorageClient { func NewTestBlockedWriteClient() *TestBlockingWriteClient {
return &TestBlockingStorageClient{} return &TestBlockingWriteClient{}
} }
func (c *TestBlockingStorageClient) Store(ctx context.Context, _ []byte) error { func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte) error {
atomic.AddUint64(&c.numCalls, 1) atomic.AddUint64(&c.numCalls, 1)
<-ctx.Done() <-ctx.Done()
return nil return nil
} }
func (c *TestBlockingStorageClient) NumCalls() uint64 { func (c *TestBlockingWriteClient) NumCalls() uint64 {
return atomic.LoadUint64(&c.numCalls) return atomic.LoadUint64(&c.numCalls)
} }
func (c *TestBlockingStorageClient) Name() string { func (c *TestBlockingWriteClient) Name() string {
return "testblockingstorageclient" return "testblockingwriteclient"
} }
func (c *TestBlockingStorageClient) Endpoint() string { func (c *TestBlockingWriteClient) Endpoint() string {
return "http://test-remote-blocking.com/1234" return "http://test-remote-blocking.com/1234"
} }
@ -516,7 +516,7 @@ func BenchmarkSampleDelivery(b *testing.B) {
n := config.DefaultQueueConfig.MaxSamplesPerSend * 10 n := config.DefaultQueueConfig.MaxSamplesPerSend * 10
samples, series := createTimeseries(n, n) samples, series := createTimeseries(n, n)
c := NewTestStorageClient() c := NewTestWriteClient()
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
@ -568,7 +568,7 @@ func BenchmarkStartup(b *testing.B) {
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestBlockedStorageClient() c := NewTestBlockedWriteClient()
m := NewQueueManager(metrics, nil, nil, logger, dir, m := NewQueueManager(metrics, nil, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration), newEWMARate(ewmaWeight, shardUpdateDuration),
config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) config.DefaultQueueConfig, nil, nil, c, 1*time.Minute)
@ -611,7 +611,7 @@ func TestProcessExternalLabels(t *testing.T) {
} }
func TestCalculateDesiredShards(t *testing.T) { func TestCalculateDesiredShards(t *testing.T) {
c := NewTestStorageClient() c := NewTestWriteClient()
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
dir, err := ioutil.TempDir("", "TestCalculateDesiredShards") dir, err := ioutil.TempDir("", "TestCalculateDesiredShards")

View File

@ -15,190 +15,162 @@ package remote
import ( import (
"context" "context"
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus" "github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
) )
var remoteReadQueries = prometheus.NewGaugeVec( type sampleAndChunkQueryableClient struct {
prometheus.GaugeOpts{ client ReadClient
Namespace: namespace, externalLabels labels.Labels
Subsystem: subsystem, requiredMatchers []*labels.Matcher
Name: "remote_read_queries", readRecent bool
Help: "The number of in-flight remote read queries.", callback startTimeCallback
},
[]string{remoteName, endpoint},
)
var remoteReadQueriesHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "read_request_duration_seconds",
Help: "Histogram of the latency for remote read requests.",
Buckets: append(prometheus.DefBuckets, 25, 60),
},
[]string{remoteName, endpoint},
)
func init() {
prometheus.MustRegister(remoteReadQueries, remoteReadQueriesHistogram)
} }
// QueryableClient returns a storage.Queryable which queries the given // NewSampleAndChunkQueryableClient returns a storage.SampleAndChunkQueryable which queries the given client to select series sets.
// Client to select series sets. func NewSampleAndChunkQueryableClient(
func QueryableClient(c *Client) storage.Queryable { c ReadClient,
remoteReadQueries.WithLabelValues(c.remoteName, c.url.String()) externalLabels labels.Labels,
remoteReadQueriesHistogram.WithLabelValues(c.remoteName, c.url.String()) requiredMatchers []*labels.Matcher,
readRecent bool,
callback startTimeCallback,
) storage.SampleAndChunkQueryable {
return &sampleAndChunkQueryableClient{
client: c,
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { externalLabels: externalLabels,
return &querier{ requiredMatchers: requiredMatchers,
readRecent: readRecent,
callback: callback,
}
}
func (c *sampleAndChunkQueryableClient) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
q := &querier{
ctx: ctx, ctx: ctx,
mint: mint, mint: mint,
maxt: maxt, maxt: maxt,
client: c, client: c.client,
}, nil externalLabels: c.externalLabels,
}) requiredMatchers: c.requiredMatchers,
}
if c.readRecent {
return q, nil
} }
// querier is an adapter to make a Client usable as a storage.Querier. var (
type querier struct { noop bool
ctx context.Context err error
mint, maxt int64 )
client *Client q.maxt, noop, err = c.preferLocalStorage(mint, maxt)
}
// Select implements storage.Querier and uses the given matchers to read series sets from the Client.
func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
query, err := ToQuery(q.mint, q.maxt, matchers, hints)
if err != nil {
return storage.ErrSeriesSet(err)
}
remoteReadGauge := remoteReadQueries.WithLabelValues(q.client.remoteName, q.client.url.String())
remoteReadGauge.Inc()
defer remoteReadGauge.Dec()
remoteReadQueriesHistogram := remoteReadQueriesHistogram.WithLabelValues(q.client.remoteName, q.client.url.String())
remoteReadQueriesHistogram.Observe(time.Since(time.Now()).Seconds())
res, err := q.client.Read(q.ctx, query)
if err != nil {
return storage.ErrSeriesSet(fmt.Errorf("remote_read: %v", err))
}
return FromQueryResult(sortSeries, res)
}
// LabelValues implements storage.Querier and is a noop.
func (q *querier) LabelValues(string) ([]string, storage.Warnings, error) {
// TODO implement?
return nil, nil, nil
}
// LabelNames implements storage.Querier and is a noop.
func (q *querier) LabelNames() ([]string, storage.Warnings, error) {
// TODO implement?
return nil, nil, nil
}
// Close implements storage.Querier and is a noop.
func (q *querier) Close() error {
return nil
}
// ExternalLabelsHandler returns a storage.Queryable which creates a
// externalLabelsQuerier.
func ExternalLabelsHandler(next storage.Queryable, externalLabels labels.Labels) storage.Queryable {
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
q, err := next.Querier(ctx, mint, maxt)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &externalLabelsQuerier{Querier: q, externalLabels: externalLabels}, nil if noop {
}) return storage.NoopQuerier(), nil
}
return q, nil
} }
// externalLabelsQuerier is a querier which ensures that Select() results match func (c *sampleAndChunkQueryableClient) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
// the configured external labels. cq := &chunkQuerier{
type externalLabelsQuerier struct { querier: querier{
storage.Querier ctx: ctx,
mint: mint,
externalLabels labels.Labels maxt: maxt,
client: c.client,
externalLabels: c.externalLabels,
requiredMatchers: c.requiredMatchers,
},
}
if c.readRecent {
return cq, nil
} }
// Select adds equality matchers for all external labels to the list of matchers var (
// before calling the wrapped storage.Queryable. The added external labels are noop bool
// removed from the returned series sets. err error
func (q externalLabelsQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { )
m, added := q.addExternalLabels(matchers) cq.querier.maxt, noop, err = c.preferLocalStorage(mint, maxt)
return newSeriesSetFilter(q.Querier.Select(sortSeries, hints, m...), added) if err != nil {
return nil, err
}
if noop {
return storage.NoopChunkedQuerier(), nil
}
return cq, nil
} }
// PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier // preferLocalStorage returns noop if requested timeframe can be answered completely by the local TSDB, and
// if requested timeframe can be answered completely by the local TSDB, and
// reduces maxt if the timeframe can be partially answered by TSDB. // reduces maxt if the timeframe can be partially answered by TSDB.
func PreferLocalStorageFilter(next storage.Queryable, cb startTimeCallback) storage.Queryable { func (c *sampleAndChunkQueryableClient) preferLocalStorage(mint, maxt int64) (cmaxt int64, noop bool, err error) {
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { localStartTime, err := c.callback()
localStartTime, err := cb()
if err != nil { if err != nil {
return nil, err return 0, false, err
} }
cmaxt := maxt cmaxt = maxt
// Avoid queries whose time range is later than the first timestamp in local DB. // Avoid queries whose time range is later than the first timestamp in local DB.
if mint > localStartTime { if mint > localStartTime {
return storage.NoopQuerier(), nil return 0, true, nil
} }
// Query only samples older than the first timestamp in local DB. // Query only samples older than the first timestamp in local DB.
if maxt > localStartTime { if maxt > localStartTime {
cmaxt = localStartTime cmaxt = localStartTime
} }
return next.Querier(ctx, mint, cmaxt) return cmaxt, false, nil
})
} }
// RequiredMatchersFilter returns a storage.Queryable which creates a type querier struct {
// requiredMatchersQuerier. ctx context.Context
func RequiredMatchersFilter(next storage.Queryable, required []*labels.Matcher) storage.Queryable { mint, maxt int64
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { client ReadClient
q, err := next.Querier(ctx, mint, maxt)
if err != nil {
return nil, err
}
return &requiredMatchersQuerier{Querier: q, requiredMatchers: required}, nil
})
}
// requiredMatchersQuerier wraps a storage.Querier and requires Select() calls
// to match the given labelSet.
type requiredMatchersQuerier struct {
storage.Querier
// Derived from configuration.
externalLabels labels.Labels
requiredMatchers []*labels.Matcher requiredMatchers []*labels.Matcher
} }
// Select returns a NoopSeriesSet if the given matchers don't match the label // Select implements storage.Querier and uses the given matchers to read series sets from the client.
// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier. // Select also adds equality matchers for all external labels to the list of matchers before calling remote endpoint.
func (q requiredMatchersQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { // The added external labels are removed from the returned series sets.
ms := q.requiredMatchers //
// If requiredMatchers are given, select returns a NoopSeriesSet if the given matchers don't match the label set of the
// requiredMatchers. Otherwise it'll just call remote endpoint.
func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
if len(q.requiredMatchers) > 0 {
// Copy to not modify slice configured by user.
requiredMatchers := append([]*labels.Matcher{}, q.requiredMatchers...)
for _, m := range matchers { for _, m := range matchers {
for i, r := range ms { for i, r := range requiredMatchers {
if m.Type == labels.MatchEqual && m.Name == r.Name && m.Value == r.Value { if m.Type == labels.MatchEqual && m.Name == r.Name && m.Value == r.Value {
ms = append(ms[:i], ms[i+1:]...) // Requirement matched.
requiredMatchers = append(requiredMatchers[:i], requiredMatchers[i+1:]...)
break break
} }
} }
if len(ms) == 0 { if len(requiredMatchers) == 0 {
break break
} }
} }
if len(ms) > 0 { if len(requiredMatchers) > 0 {
return storage.NoopSeriesSet() return storage.NoopSeriesSet()
} }
return q.Querier.Select(sortSeries, hints, matchers...) }
m, added := q.addExternalLabels(matchers)
query, err := ToQuery(q.mint, q.maxt, m, hints)
if err != nil {
return storage.ErrSeriesSet(errors.Wrap(err, "toQuery"))
}
res, err := q.client.Read(q.ctx, query)
if err != nil {
return storage.ErrSeriesSet(errors.Wrap(err, "remote_read"))
}
return newSeriesSetFilter(FromQueryResult(sortSeries, res), added)
} }
// addExternalLabels adds matchers for each external label. External labels // addExternalLabels adds matchers for each external label. External labels
@ -207,7 +179,7 @@ func (q requiredMatchersQuerier) Select(sortSeries bool, hints *storage.SelectHi
// We return the new set of matchers, along with a map of labels for which // We return the new set of matchers, along with a map of labels for which
// matchers were added, so that these can later be removed from the result // matchers were added, so that these can later be removed from the result
// time series again. // time series again.
func (q externalLabelsQuerier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, labels.Labels) { func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, labels.Labels) {
el := make(labels.Labels, len(q.externalLabels)) el := make(labels.Labels, len(q.externalLabels))
copy(el, q.externalLabels) copy(el, q.externalLabels)
@ -232,6 +204,35 @@ func (q externalLabelsQuerier) addExternalLabels(ms []*labels.Matcher) ([]*label
return ms, el return ms, el
} }
// LabelValues implements storage.Querier and is a noop.
func (q *querier) LabelValues(string) ([]string, storage.Warnings, error) {
// TODO: Implement: https://github.com/prometheus/prometheus/issues/3351
return nil, nil, errors.New("not implemented")
}
// LabelNames implements storage.Querier and is a noop.
func (q *querier) LabelNames() ([]string, storage.Warnings, error) {
// TODO: Implement: https://github.com/prometheus/prometheus/issues/3351
return nil, nil, errors.New("not implemented")
}
// Close implements storage.Querier and is a noop.
func (q *querier) Close() error {
return nil
}
// chunkQuerier is an adapter to make a client usable as a storage.ChunkQuerier.
type chunkQuerier struct {
querier
}
// Select implements storage.ChunkQuerier and uses the given matchers to read chunk series sets from the client.
// It uses remote.querier.Select so it supports external labels and required matchers if specified.
func (q *chunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
// TODO(bwplotka) Support remote read chunked and allow returning chunks directly (TODO ticket).
return storage.NewSeriesSetToChunkSet(q.querier.Select(sortSeries, hints, matchers...))
}
func newSeriesSetFilter(ss storage.SeriesSet, toFilter labels.Labels) storage.SeriesSet { func newSeriesSetFilter(ss storage.SeriesSet, toFilter labels.Labels) storage.SeriesSet {
return &seriesSetFilter{ return &seriesSetFilter{
SeriesSet: ss, SeriesSet: ss,

View File

@ -22,6 +22,7 @@ import (
"sort" "sort"
"testing" "testing"
"github.com/pkg/errors"
config_util "github.com/prometheus/common/config" config_util "github.com/prometheus/common/config"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
@ -40,7 +41,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
URL: &config_util.URL{ URL: &config_util.URL{
URL: &url.URL{ URL: &url.URL{
Scheme: "http", Scheme: "http",
Host: "localhost", Host: "localhost1",
}, },
}, },
} }
@ -49,7 +50,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
URL: &config_util.URL{ URL: &config_util.URL{
URL: &url.URL{ URL: &url.URL{
Scheme: "http", Scheme: "http",
Host: "localhost", Host: "localhost2",
}, },
}, },
} }
@ -57,7 +58,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
URL: &config_util.URL{ URL: &config_util.URL{
URL: &url.URL{ URL: &url.URL{
Scheme: "http", Scheme: "http",
Host: "localhost", Host: "localhost3",
}, },
}, },
} }
@ -92,6 +93,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
} }
for _, tc := range cases { for _, tc := range cases {
t.Run("", func(t *testing.T) {
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline) s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline)
conf := &config.Config{ conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig, GlobalConfig: config.DefaultGlobalConfig,
@ -100,27 +102,8 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
err := s.ApplyConfig(conf) err := s.ApplyConfig(conf)
gotError := err != nil gotError := err != nil
testutil.Equals(t, tc.err, gotError) testutil.Equals(t, tc.err, gotError)
testutil.Ok(t, s.Close())
err = s.Close() })
testutil.Ok(t, err)
}
}
func TestExternalLabelsQuerierSelect(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "job", "api-server"),
}
q := &externalLabelsQuerier{
Querier: mockQuerier{},
externalLabels: labels.Labels{
{Name: "region", Value: "europe"},
},
}
want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels)
have := q.Select(false, nil, matchers...)
if !reflect.DeepEqual(want, have) {
t.Errorf("expected series set %+v, got %+v", want, have)
} }
} }
@ -179,7 +162,7 @@ func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) {
} }
for i, test := range tests { for i, test := range tests {
q := &externalLabelsQuerier{Querier: mockQuerier{}, externalLabels: test.el} q := &querier{externalLabels: test.el}
matchers, added := q.addExternalLabels(test.inMatchers) matchers, added := q.addExternalLabels(test.inMatchers)
sort.Slice(test.outMatchers, func(i, j int) bool { return test.outMatchers[i].Name < test.outMatchers[j].Name }) sort.Slice(test.outMatchers, func(i, j int) bool { return test.outMatchers[i].Name < test.outMatchers[j].Name })
@ -225,180 +208,307 @@ func TestSeriesSetFilter(t *testing.T) {
} }
} }
type mockQuerier struct { type mockedRemoteClient struct {
ctx context.Context got *prompb.Query
mint, maxt int64 store []*prompb.TimeSeries
storage.Querier
} }
type mockSeriesSet struct { func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
storage.SeriesSet if c.got != nil {
return nil, errors.Errorf("expected only one call to remote client got: %v", query)
} }
c.got = query
func (mockQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { matchers, err := FromLabelMatchers(query.Matchers)
return mockSeriesSet{}
}
func TestPreferLocalStorageFilter(t *testing.T) {
ctx := context.Background()
tests := []struct {
localStartTime int64
mint int64
maxt int64
querier storage.Querier
}{
{
localStartTime: int64(100),
mint: int64(0),
maxt: int64(50),
querier: mockQuerier{ctx: ctx, mint: 0, maxt: 50},
},
{
localStartTime: int64(20),
mint: int64(0),
maxt: int64(50),
querier: mockQuerier{ctx: ctx, mint: 0, maxt: 20},
},
{
localStartTime: int64(20),
mint: int64(30),
maxt: int64(50),
querier: storage.NoopQuerier(),
},
}
for i, test := range tests {
f := PreferLocalStorageFilter(
storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return mockQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil
}),
func() (int64, error) { return test.localStartTime, nil },
)
q, err := f.Querier(ctx, test.mint, test.maxt)
if err != nil { if err != nil {
t.Fatal(err) return nil, err
} }
if test.querier != q { q := &prompb.QueryResult{}
t.Errorf("%d. expected querier %+v, got %+v", i, test.querier, q) for _, s := range c.store {
l := labelProtosToLabels(s.Labels)
var notMatch bool
for _, m := range matchers {
if v := l.Get(m.Name); v != "" {
if !m.Matches(v) {
notMatch = true
break
} }
} }
} }
func TestRequiredMatchersFilter(t *testing.T) { if !notMatch {
ctx := context.Background() q.Timeseries = append(q.Timeseries, &prompb.TimeSeries{Labels: s.Labels})
f := RequiredMatchersFilter(
storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return mockQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil
}),
[]*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "special", "label")},
)
want := &requiredMatchersQuerier{
Querier: mockQuerier{ctx: ctx, mint: 0, maxt: 50},
requiredMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "special", "label")},
}
have, err := f.Querier(ctx, 0, 50)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(want, have) {
t.Errorf("expected querier %+v, got %+v", want, have)
} }
} }
return q, nil
}
func TestRequiredLabelsQuerierSelect(t *testing.T) { func (c *mockedRemoteClient) reset() {
tests := []struct { c.got = nil
requiredMatchers []*labels.Matcher }
// NOTE: We don't need to test ChunkQuerier as it's uses querier for all operations anyway.
func TestSampleAndChunkQueryableClient(t *testing.T) {
m := &mockedRemoteClient{
// Samples does not matter for below tests.
store: []*prompb.TimeSeries{
{Labels: []prompb.Label{{Name: "a", Value: "b"}}},
{Labels: []prompb.Label{{Name: "a", Value: "b3"}, {Name: "region", Value: "us"}}},
{Labels: []prompb.Label{{Name: "a", Value: "b2"}, {Name: "region", Value: "europe"}}},
},
}
for _, tc := range []struct {
name string
matchers []*labels.Matcher matchers []*labels.Matcher
seriesSet storage.SeriesSet mint, maxt int64
externalLabels labels.Labels
requiredMatchers []*labels.Matcher
readRecent bool
callback startTimeCallback
expectedQuery *prompb.Query
expectedSeries []labels.Labels
}{ }{
{ {
requiredMatchers: []*labels.Matcher{}, name: "empty",
mint: 1, maxt: 2,
matchers: []*labels.Matcher{ matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "special", "label"), labels.MustNewMatcher(labels.MatchNotEqual, "a", "something"),
}, },
seriesSet: mockSeriesSet{}, readRecent: true,
},
{
requiredMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "special", "label"),
},
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "special", "label"),
},
seriesSet: mockSeriesSet{},
},
{
requiredMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "special", "label"),
},
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, "special", "label"),
},
seriesSet: storage.NoopSeriesSet(),
},
{
requiredMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "special", "label"),
},
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "special", "different"),
},
seriesSet: storage.NoopSeriesSet(),
},
{
requiredMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "special", "label"),
},
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "special", "label"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
seriesSet: mockSeriesSet{},
},
{
requiredMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "special", "label"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "special", "label"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "baz"),
},
seriesSet: storage.NoopSeriesSet(),
},
{
requiredMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "special", "label"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "special", "label"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
seriesSet: mockSeriesSet{},
},
}
for i, test := range tests { expectedQuery: &prompb.Query{
q := &requiredMatchersQuerier{ StartTimestampMs: 1,
Querier: mockQuerier{}, EndTimestampMs: 2,
requiredMatchers: test.requiredMatchers, Matchers: []*prompb.LabelMatcher{
} {Type: prompb.LabelMatcher_NEQ, Name: "a", Value: "something"},
},
},
expectedSeries: []labels.Labels{
labels.FromStrings("a", "b"),
labels.FromStrings("a", "b2", "region", "europe"),
labels.FromStrings("a", "b3", "region", "us"),
},
},
{
name: "external labels specified, not explicitly requested",
mint: 1, maxt: 2,
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "a", "something"),
},
readRecent: true,
externalLabels: labels.Labels{
{Name: "region", Value: "europe"},
},
have := q.Select(false, nil, test.matchers...) expectedQuery: &prompb.Query{
StartTimestampMs: 1,
EndTimestampMs: 2,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_NEQ, Name: "a", Value: "something"},
{Type: prompb.LabelMatcher_EQ, Name: "region", Value: "europe"},
},
},
expectedSeries: []labels.Labels{
labels.FromStrings("a", "b"),
labels.FromStrings("a", "b2"),
},
},
{
name: "external labels specified, explicitly requested europe",
mint: 1, maxt: 2,
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "a", "something"),
labels.MustNewMatcher(labels.MatchEqual, "region", "europe"),
},
readRecent: true,
externalLabels: labels.Labels{
{Name: "region", Value: "europe"},
},
if want := test.seriesSet; want != have { expectedQuery: &prompb.Query{
t.Errorf("%d. expected series set %+v, got %+v", i, want, have) StartTimestampMs: 1,
} EndTimestampMs: 2,
if want, have := test.requiredMatchers, q.requiredMatchers; !reflect.DeepEqual(want, have) { Matchers: []*prompb.LabelMatcher{
t.Errorf("%d. requiredMatchersQuerier.Select() has modified the matchers", i) {Type: prompb.LabelMatcher_NEQ, Name: "a", Value: "something"},
{Type: prompb.LabelMatcher_EQ, Name: "region", Value: "europe"},
},
},
expectedSeries: []labels.Labels{
labels.FromStrings("a", "b"),
labels.FromStrings("a", "b2", "region", "europe"),
},
},
{
name: "external labels specified, explicitly requested not europe",
mint: 1, maxt: 2,
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "a", "something"),
labels.MustNewMatcher(labels.MatchEqual, "region", "us"),
},
readRecent: true,
externalLabels: labels.Labels{
{Name: "region", Value: "europe"},
},
expectedQuery: &prompb.Query{
StartTimestampMs: 1,
EndTimestampMs: 2,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_NEQ, Name: "a", Value: "something"},
{Type: prompb.LabelMatcher_EQ, Name: "region", Value: "us"},
},
},
expectedSeries: []labels.Labels{
labels.FromStrings("a", "b"),
labels.FromStrings("a", "b3", "region", "us"),
},
},
{
name: "prefer local storage",
mint: 0, maxt: 50,
callback: func() (i int64, err error) { return 100, nil },
readRecent: false,
expectedQuery: &prompb.Query{
StartTimestampMs: 0,
EndTimestampMs: 50,
Matchers: []*prompb.LabelMatcher{},
},
expectedSeries: []labels.Labels{
labels.FromStrings("a", "b"),
labels.FromStrings("a", "b2", "region", "europe"),
labels.FromStrings("a", "b3", "region", "us"),
},
},
{
name: "prefer local storage, limited time",
mint: 0, maxt: 50,
callback: func() (i int64, err error) { return 20, nil },
readRecent: false,
expectedQuery: &prompb.Query{
StartTimestampMs: 0,
EndTimestampMs: 20,
Matchers: []*prompb.LabelMatcher{},
},
expectedSeries: []labels.Labels{
labels.FromStrings("a", "b"),
labels.FromStrings("a", "b2", "region", "europe"),
labels.FromStrings("a", "b3", "region", "us"),
},
},
{
name: "prefer local storage, skipped",
mint: 30, maxt: 50,
callback: func() (i int64, err error) { return 20, nil },
readRecent: false,
expectedQuery: nil,
expectedSeries: nil, // Noop should be used.
},
{
name: "required matcher specified, user also specifies same",
mint: 1, maxt: 2,
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "a", "b2"),
},
readRecent: true,
requiredMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "a", "b2"),
},
expectedQuery: &prompb.Query{
StartTimestampMs: 1,
EndTimestampMs: 2,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "a", Value: "b2"},
},
},
expectedSeries: []labels.Labels{
labels.FromStrings("a", "b2", "region", "europe"),
},
},
{
name: "required matcher specified",
mint: 1, maxt: 2,
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "a", "b2"),
},
readRecent: true,
requiredMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "a", "b2"),
},
expectedQuery: &prompb.Query{
StartTimestampMs: 1,
EndTimestampMs: 2,
Matchers: []*prompb.LabelMatcher{
{Type: prompb.LabelMatcher_EQ, Name: "a", Value: "b2"},
},
},
expectedSeries: []labels.Labels{
labels.FromStrings("a", "b2", "region", "europe"),
},
},
{
name: "required matcher specified, given matcher does not match",
mint: 1, maxt: 2,
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "a", "something"),
},
readRecent: true,
requiredMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "a", "b2"),
},
expectedQuery: nil,
expectedSeries: nil, // Given matchers does not match with required ones, noop expected.
},
{
name: "required matcher specified, given matcher does not match2",
mint: 1, maxt: 2,
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "x", "something"),
},
readRecent: true,
requiredMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "a", "b2"),
},
expectedQuery: nil,
expectedSeries: nil, // Given matchers does not match with required ones, noop expected.
},
} {
t.Run(tc.name, func(t *testing.T) {
m.reset()
c := NewSampleAndChunkQueryableClient(
m,
tc.externalLabels,
tc.requiredMatchers,
tc.readRecent,
tc.callback,
)
q, err := c.Querier(context.TODO(), tc.mint, tc.maxt)
testutil.Ok(t, err)
defer testutil.Ok(t, q.Close())
ss := q.Select(true, nil, tc.matchers...)
testutil.Ok(t, err)
testutil.Equals(t, storage.Warnings(nil), ss.Warnings())
testutil.Equals(t, tc.expectedQuery, m.got)
var got []labels.Labels
for ss.Next() {
got = append(got, ss.At().Labels())
} }
testutil.Ok(t, ss.Err())
testutil.Equals(t, tc.expectedSeries, got)
})
} }
} }

View File

@ -51,8 +51,8 @@ type Storage struct {
rws *WriteStorage rws *WriteStorage
// For reads // For reads.
queryables []storage.Queryable queryables []storage.SampleAndChunkQueryable
localStartTimeCallback startTimeCallback localStartTimeCallback startTimeCallback
} }
@ -61,6 +61,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
} }
s := &Storage{ s := &Storage{
logger: logging.Dedupe(l, 1*time.Minute), logger: logging.Dedupe(l, 1*time.Minute),
localStartTimeCallback: stCallback, localStartTimeCallback: stCallback,
@ -80,7 +81,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
// Update read clients // Update read clients
readHashes := make(map[string]struct{}) readHashes := make(map[string]struct{})
queryables := make([]storage.Queryable, 0, len(conf.RemoteReadConfigs)) queryables := make([]storage.SampleAndChunkQueryable, 0, len(conf.RemoteReadConfigs))
for _, rrConf := range conf.RemoteReadConfigs { for _, rrConf := range conf.RemoteReadConfigs {
hash, err := toHash(rrConf) hash, err := toHash(rrConf)
if err != nil { if err != nil {
@ -96,12 +97,12 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
// Set the queue name to the config hash if the user has not set // Set the queue name to the config hash if the user has not set
// a name in their remote write config so we can still differentiate // a name in their remote write config so we can still differentiate
// between queues that have the same remote write endpoint. // between queues that have the same remote write endpoint.
name := string(hash[:6]) name := hash[:6]
if rrConf.Name != "" { if rrConf.Name != "" {
name = rrConf.Name name = rrConf.Name
} }
c, err := NewClient(name, &ClientConfig{ c, err := newReadClient(name, &ClientConfig{
URL: rrConf.URL, URL: rrConf.URL,
Timeout: rrConf.RemoteTimeout, Timeout: rrConf.RemoteTimeout,
HTTPClientConfig: rrConf.HTTPClientConfig, HTTPClientConfig: rrConf.HTTPClientConfig,
@ -110,15 +111,13 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
return err return err
} }
q := QueryableClient(c) queryables = append(queryables, NewSampleAndChunkQueryableClient(
q = ExternalLabelsHandler(q, conf.GlobalConfig.ExternalLabels) c,
if len(rrConf.RequiredMatchers) > 0 { conf.GlobalConfig.ExternalLabels,
q = RequiredMatchersFilter(q, labelsToEqualityMatchers(rrConf.RequiredMatchers)) labelsToEqualityMatchers(rrConf.RequiredMatchers),
} rrConf.ReadRecent,
if !rrConf.ReadRecent { s.localStartTimeCallback,
q = PreferLocalStorageFilter(q, s.localStartTimeCallback) ))
}
queryables = append(queryables, q)
} }
s.queryables = queryables s.queryables = queryables
@ -148,7 +147,25 @@ func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querie
} }
queriers = append(queriers, q) queriers = append(queriers, q)
} }
return storage.NewMergeQuerier(nil, queriers, storage.ChainedSeriesMerge), nil return storage.NewMergeQuerier(storage.NoopQuerier(), queriers, storage.ChainedSeriesMerge), nil
}
// ChunkQuerier returns a storage.MergeQuerier combining the remote client queriers
// of each configured remote read endpoint.
func (s *Storage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
s.mtx.Lock()
queryables := s.queryables
s.mtx.Unlock()
queriers := make([]storage.ChunkQuerier, 0, len(queryables))
for _, queryable := range queryables {
q, err := queryable.ChunkQuerier(ctx, mint, maxt)
if err != nil {
return nil, err
}
queriers = append(queriers, q)
}
return storage.NewMergeChunkQuerier(storage.NoopChunkedQuerier(), queriers, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil
} }
// Appender implements storage.Storage. // Appender implements storage.Storage.

View File

@ -113,12 +113,12 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
// Set the queue name to the config hash if the user has not set // Set the queue name to the config hash if the user has not set
// a name in their remote write config so we can still differentiate // a name in their remote write config so we can still differentiate
// between queues that have the same remote write endpoint. // between queues that have the same remote write endpoint.
name := string(hash[:6]) name := hash[:6]
if rwConf.Name != "" { if rwConf.Name != "" {
name = rwConf.Name name = rwConf.Name
} }
c, err := NewClient(name, &ClientConfig{ c, err := NewWriteClient(name, &ClientConfig{
URL: rwConf.URL, URL: rwConf.URL,
Timeout: rwConf.RemoteTimeout, Timeout: rwConf.RemoteTimeout,
HTTPClientConfig: rwConf.HTTPClientConfig, HTTPClientConfig: rwConf.HTTPClientConfig,

View File

@ -14,6 +14,7 @@
package storage package storage
import ( import (
"math"
"sort" "sort"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
@ -23,23 +24,34 @@ import (
) )
type listSeriesIterator struct { type listSeriesIterator struct {
samples []tsdbutil.Sample samples Samples
idx int idx int
} }
type samples []tsdbutil.Sample
func (s samples) Get(i int) tsdbutil.Sample { return s[i] }
func (s samples) Len() int { return len(s) }
// Samples interface allows to work on arrays of types that are compatible with tsdbutil.Sample.
type Samples interface {
Get(i int) tsdbutil.Sample
Len() int
}
// NewListSeriesIterator returns listSeriesIterator that allows to iterate over provided samples. Does not handle overlaps. // NewListSeriesIterator returns listSeriesIterator that allows to iterate over provided samples. Does not handle overlaps.
func NewListSeriesIterator(samples []tsdbutil.Sample) chunkenc.Iterator { func NewListSeriesIterator(samples Samples) chunkenc.Iterator {
return &listSeriesIterator{samples: samples, idx: -1} return &listSeriesIterator{samples: samples, idx: -1}
} }
func (it *listSeriesIterator) At() (int64, float64) { func (it *listSeriesIterator) At() (int64, float64) {
s := it.samples[it.idx] s := it.samples.Get(it.idx)
return s.T(), s.V() return s.T(), s.V()
} }
func (it *listSeriesIterator) Next() bool { func (it *listSeriesIterator) Next() bool {
it.idx++ it.idx++
return it.idx < len(it.samples) return it.idx < it.samples.Len()
} }
func (it *listSeriesIterator) Seek(t int64) bool { func (it *listSeriesIterator) Seek(t int64) bool {
@ -47,12 +59,12 @@ func (it *listSeriesIterator) Seek(t int64) bool {
it.idx = 0 it.idx = 0
} }
// Do binary search between current position and end. // Do binary search between current position and end.
it.idx = sort.Search(len(it.samples)-it.idx, func(i int) bool { it.idx = sort.Search(it.samples.Len()-it.idx, func(i int) bool {
s := it.samples[i+it.idx] s := it.samples.Get(i + it.idx)
return s.T() >= t return s.T() >= t
}) })
return it.idx < len(it.samples) return it.idx < it.samples.Len()
} }
func (it *listSeriesIterator) Err() error { return nil } func (it *listSeriesIterator) Err() error { return nil }
@ -84,7 +96,6 @@ type chunkSetToSeriesSet struct {
chkIterErr error chkIterErr error
sameSeriesChunks []Series sameSeriesChunks []Series
bufIterator chunkenc.Iterator
} }
// NewSeriesSetFromChunkSeriesSet converts ChunkSeriesSet to SeriesSet by decoding chunks one by one. // NewSeriesSetFromChunkSeriesSet converts ChunkSeriesSet to SeriesSet by decoding chunks one by one.
@ -101,10 +112,9 @@ func (c *chunkSetToSeriesSet) Next() bool {
c.sameSeriesChunks = c.sameSeriesChunks[:0] c.sameSeriesChunks = c.sameSeriesChunks[:0]
for iter.Next() { for iter.Next() {
c.sameSeriesChunks = append(c.sameSeriesChunks, &chunkToSeries{ c.sameSeriesChunks = append(c.sameSeriesChunks, &chunkToSeriesDecoder{
labels: c.ChunkSeriesSet.At().Labels(), labels: c.ChunkSeriesSet.At().Labels(),
chk: iter.At(), Meta: iter.At(),
buf: c.bufIterator,
}) })
} }
@ -128,11 +138,82 @@ func (c *chunkSetToSeriesSet) Err() error {
return c.ChunkSeriesSet.Err() return c.ChunkSeriesSet.Err()
} }
type chunkToSeries struct { type chunkToSeriesDecoder struct {
chunks.Meta
labels labels.Labels labels labels.Labels
chk chunks.Meta
buf chunkenc.Iterator
} }
func (s *chunkToSeries) Labels() labels.Labels { return s.labels } func (s *chunkToSeriesDecoder) Labels() labels.Labels { return s.labels }
func (s *chunkToSeries) Iterator() chunkenc.Iterator { return s.chk.Chunk.Iterator(s.buf) }
// TODO(bwplotka): Can we provide any chunkenc buffer?
func (s *chunkToSeriesDecoder) Iterator() chunkenc.Iterator { return s.Chunk.Iterator(nil) }
type seriesSetToChunkSet struct {
SeriesSet
}
// NewSeriesSetToChunkSet converts SeriesSet to ChunkSeriesSet by encoding chunks from samples.
func NewSeriesSetToChunkSet(chk SeriesSet) ChunkSeriesSet {
return &seriesSetToChunkSet{SeriesSet: chk}
}
func (c *seriesSetToChunkSet) Next() bool {
if c.Err() != nil || !c.SeriesSet.Next() {
return false
}
return true
}
func (c *seriesSetToChunkSet) At() ChunkSeries {
return &seriesToChunkEncoder{
Series: c.SeriesSet.At(),
}
}
func (c *seriesSetToChunkSet) Err() error {
return c.SeriesSet.Err()
}
type seriesToChunkEncoder struct {
Series
}
// TODO(bwplotka): Currently encoder will just naively build one chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
func (s *seriesToChunkEncoder) Iterator() chunks.Iterator {
chk := chunkenc.NewXORChunk()
app, err := chk.Appender()
if err != nil {
return errChunksIterator{err: err}
}
mint := int64(math.MaxInt64)
maxt := int64(math.MinInt64)
seriesIter := s.Series.Iterator()
for seriesIter.Next() {
t, v := seriesIter.At()
app.Append(t, v)
maxt = t
if mint == math.MaxInt64 {
mint = t
}
}
if err := seriesIter.Err(); err != nil {
return errChunksIterator{err: err}
}
return NewListChunkSeriesIterator(chunks.Meta{
MinTime: mint,
MaxTime: maxt,
Chunk: chk,
})
}
type errChunksIterator struct {
err error
}
func (e errChunksIterator) At() chunks.Meta { return chunks.Meta{} }
func (e errChunksIterator) Next() bool { return false }
func (e errChunksIterator) Err() error { return e.err }

View File

@ -27,11 +27,11 @@ type MockSeries struct {
SampleIteratorFn func() chunkenc.Iterator SampleIteratorFn func() chunkenc.Iterator
} }
func NewListSeries(lset labels.Labels, samples []tsdbutil.Sample) *MockSeries { func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *MockSeries {
return &MockSeries{ return &MockSeries{
labels: lset, labels: lset,
SampleIteratorFn: func() chunkenc.Iterator { SampleIteratorFn: func() chunkenc.Iterator {
return NewListSeriesIterator(samples) return NewListSeriesIterator(samples(s))
}, },
} }
} }

View File

@ -39,11 +39,11 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/wal"
"golang.org/x/sync/errgroup"
// Load the package into main to make sure minium Go version is met. // Load the package into main to make sure minium Go version is met.
_ "github.com/prometheus/prometheus/tsdb/goversion" _ "github.com/prometheus/prometheus/tsdb/goversion"
"github.com/prometheus/prometheus/tsdb/wal"
"golang.org/x/sync/errgroup"
) )
const ( const (
@ -420,6 +420,11 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu
return dbWritable.Querier(ctx, mint, maxt) return dbWritable.Querier(ctx, mint, maxt)
} }
func (db *DBReadOnly) ChunkQuerier(context.Context, int64, int64) (storage.ChunkQuerier, error) {
// TODO(bwplotka): Implement in next PR.
return nil, errors.New("not implemented")
}
// Blocks returns a slice of block readers for persisted blocks. // Blocks returns a slice of block readers for persisted blocks.
func (db *DBReadOnly) Blocks() ([]BlockReader, error) { func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
select { select {
@ -1345,6 +1350,11 @@ func (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, err
}, nil }, nil
} }
func (db *DB) ChunkQuerier(context.Context, int64, int64) (storage.ChunkQuerier, error) {
// TODO(bwplotka): Implement in next PR.
return nil, errors.New("not implemented")
}
func rangeForTimestamp(t int64, width int64) (maxt int64) { func rangeForTimestamp(t int64, width int64) (maxt int64) {
return (t/width)*width + width return (t/width)*width + width
} }

View File

@ -172,6 +172,7 @@ type TSDBAdminStats interface {
// API can register a set of endpoints in a router and handle // API can register a set of endpoints in a router and handle
// them using the provided storage and query engine. // them using the provided storage and query engine.
type API struct { type API struct {
// TODO(bwplotka): Change to SampleAndChunkQueryable in next PR.
Queryable storage.Queryable Queryable storage.Queryable
QueryEngine *promql.Engine QueryEngine *promql.Engine
@ -204,7 +205,7 @@ func init() {
// NewAPI returns an initialized API type. // NewAPI returns an initialized API type.
func NewAPI( func NewAPI(
qe *promql.Engine, qe *promql.Engine,
q storage.Queryable, q storage.SampleAndChunkQueryable,
tr func(context.Context) TargetRetriever, tr func(context.Context) TargetRetriever,
ar func(context.Context) AlertmanagerRetriever, ar func(context.Context) AlertmanagerRetriever,
configFunc func() config.Config, configFunc func() config.Config,
@ -1199,8 +1200,8 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
sortedExternalLabels := make([]prompb.Label, 0, len(externalLabels)) sortedExternalLabels := make([]prompb.Label, 0, len(externalLabels))
for name, value := range externalLabels { for name, value := range externalLabels {
sortedExternalLabels = append(sortedExternalLabels, prompb.Label{ sortedExternalLabels = append(sortedExternalLabels, prompb.Label{
Name: string(name), Name: name,
Value: string(value), Value: value,
}) })
} }
sort.Slice(sortedExternalLabels, func(i, j int) bool { sort.Slice(sortedExternalLabels, func(i, j int) bool {
@ -1215,39 +1216,13 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
switch responseType { switch responseType {
case prompb.ReadRequest_STREAMED_XOR_CHUNKS: case prompb.ReadRequest_STREAMED_XOR_CHUNKS:
w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") api.remoteReadStreamedXORChunks(ctx, w, req, externalLabels, sortedExternalLabels)
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, "internal http.ResponseWriter does not implement http.Flusher interface", http.StatusInternalServerError)
return
}
for i, query := range req.Queries {
ws, err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) (storage.Warnings, error) {
// The streaming API has to provide the series sorted.
set := querier.Select(true, hints, filteredMatchers...)
return remote.StreamChunkedReadResponses(
remote.NewChunkedWriter(w, f),
int64(i),
set,
sortedExternalLabels,
api.remoteReadMaxBytesInFrame,
)
})
for _, w := range ws {
level.Warn(api.logger).Log("msg", "warnings on remote read query", "err", w.Error())
}
if err != nil {
if httpErr, ok := err.(remote.HTTPError); ok {
http.Error(w, httpErr.Error(), httpErr.Status())
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
default: default:
api.remoteReadSamples(ctx, w, req, externalLabels, sortedExternalLabels)
}
}
func (api *API) remoteReadSamples(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest, externalLabels map[string]string, sortedExternalLabels []prompb.Label) {
w.Header().Set("Content-Type", "application/x-protobuf") w.Header().Set("Content-Type", "application/x-protobuf")
w.Header().Set("Content-Encoding", "snappy") w.Header().Set("Content-Encoding", "snappy")
@ -1256,27 +1231,51 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
Results: make([]*prompb.QueryResult, len(req.Queries)), Results: make([]*prompb.QueryResult, len(req.Queries)),
} }
for i, query := range req.Queries { for i, query := range req.Queries {
ws, err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) (storage.Warnings, error) { if err := func() error {
set := querier.Select(false, hints, filteredMatchers...) filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
var (
ws storage.Warnings
err error
)
resp.Results[i], ws, err = remote.ToQueryResult(set, api.remoteReadSampleLimit)
if err != nil { if err != nil {
return ws, err return err
}
querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs)
if err != nil {
return err
}
defer func() {
if err := querier.Close(); err != nil {
level.Warn(api.logger).Log("msg", "Error on querier close", "err", err.Error())
}
}()
var hints *storage.SelectHints
if query.Hints != nil {
hints = &storage.SelectHints{
Start: query.Hints.StartMs,
End: query.Hints.EndMs,
Step: query.Hints.StepMs,
Func: query.Hints.Func,
Grouping: query.Hints.Grouping,
Range: query.Hints.RangeMs,
By: query.Hints.By,
}
}
var ws storage.Warnings
resp.Results[i], ws, err = remote.ToQueryResult(querier.Select(false, hints, filteredMatchers...), api.remoteReadSampleLimit)
if err != nil {
return err
}
for _, w := range ws {
level.Warn(api.logger).Log("msg", "Warnings on remote read query", "err", w.Error())
} }
for _, ts := range resp.Results[i].Timeseries { for _, ts := range resp.Results[i].Timeseries {
ts.Labels = remote.MergeLabels(ts.Labels, sortedExternalLabels) ts.Labels = remote.MergeLabels(ts.Labels, sortedExternalLabels)
} }
return ws, nil
}) return nil
for _, w := range ws { }(); err != nil {
level.Warn(api.logger).Log("msg", "warnings on remote read query", "err", w.Error())
}
if err != nil {
if httpErr, ok := err.(remote.HTTPError); ok { if httpErr, ok := err.(remote.HTTPError); ok {
http.Error(w, httpErr.Error(), httpErr.Status()) http.Error(w, httpErr.Error(), httpErr.Status())
return return
@ -1291,6 +1290,72 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
return return
} }
} }
func (api *API) remoteReadStreamedXORChunks(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest, externalLabels map[string]string, sortedExternalLabels []prompb.Label) {
w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, "internal http.ResponseWriter does not implement http.Flusher interface", http.StatusInternalServerError)
return
}
for i, query := range req.Queries {
if err := func() error {
filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
if err != nil {
return err
}
// TODO(bwplotka): Use ChunkQuerier once ready in tsdb package.
querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs)
if err != nil {
return err
}
defer func() {
if err := querier.Close(); err != nil {
level.Warn(api.logger).Log("msg", "Error on chunk querier close", "warnings", err.Error())
}
}()
var hints *storage.SelectHints
if query.Hints != nil {
hints = &storage.SelectHints{
Start: query.Hints.StartMs,
End: query.Hints.EndMs,
Step: query.Hints.StepMs,
Func: query.Hints.Func,
Grouping: query.Hints.Grouping,
Range: query.Hints.RangeMs,
By: query.Hints.By,
}
}
ws, err := remote.DeprecatedStreamChunkedReadResponses(
remote.NewChunkedWriter(w, f),
int64(i),
// The streaming API has to provide the series sorted.
querier.Select(true, hints, filteredMatchers...),
sortedExternalLabels,
api.remoteReadMaxBytesInFrame,
)
if err != nil {
return err
}
for _, w := range ws {
level.Warn(api.logger).Log("msg", "Warnings on chunked remote read query", "warnings", w.Error())
}
return nil
}(); err != nil {
if httpErr, ok := err.(remote.HTTPError); ok {
http.Error(w, httpErr.Error(), httpErr.Status())
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
} }
// filterExtLabelsFromMatchers change equality matchers which match external labels // filterExtLabelsFromMatchers change equality matchers which match external labels
@ -1319,37 +1384,6 @@ func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabe
return filteredMatchers, nil return filteredMatchers, nil
} }
func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) (storage.Warnings, error)) (storage.Warnings, error) {
filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
if err != nil {
return nil, err
}
querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs)
if err != nil {
return nil, err
}
defer func() {
if err := querier.Close(); err != nil {
level.Warn(api.logger).Log("msg", "Error on querier close", "err", err.Error())
}
}()
var hints *storage.SelectHints
if query.Hints != nil {
hints = &storage.SelectHints{
Start: query.Hints.StartMs,
End: query.Hints.EndMs,
Step: query.Hints.StepMs,
Func: query.Hints.Func,
Grouping: query.Hints.Grouping,
Range: query.Hints.RangeMs,
By: query.Hints.By,
}
}
return seriesHandleFn(querier, hints, filteredMatchers)
}
func (api *API) deleteSeries(r *http.Request) apiFuncResult { func (api *API) deleteSeries(r *http.Request) apiFuncResult {
if !api.enableAdmin { if !api.enableAdmin {
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil} return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil}

View File

@ -360,9 +360,7 @@ func TestEndpoints(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dbDir) defer os.RemoveAll(dbDir)
remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) { remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, nil, dbDir, 1*time.Second)
return 0, nil
}, dbDir, 1*time.Second)
err = remote.ApplyConfig(&config.Config{ err = remote.ApplyConfig(&config.Config{
RemoteReadConfigs: []*config.RemoteReadConfig{ RemoteReadConfigs: []*config.RemoteReadConfig{