From b788986717e1597452ca25e5219510bb787165c7 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 24 Jun 2020 15:41:52 +0200 Subject: [PATCH] storage: Adjusted fully storage layer support for chunk iterators: Remote read client, readyStorage, fanout. (#7059) * Fixed nits introduced by https://github.com/prometheus/prometheus/pull/7334 * Added ChunkQueryable implementation to fanout and readyStorage. * Added more comments. * Changed NewVerticalChunkSeriesMerger to CompactingChunkSeriesMerger, removed tiny interface by reusing VerticalSeriesMergeFunc for overlapping algorithm for both chunks and series, for both querying and compacting (!) + made sure duplicates are merged. * Added ErrChunkSeriesSet * Added Samples interface for seamless []promb.Sample to []tsdbutil.Sample conversion. * Deprecating non chunks serieset based StreamChunkedReadResponses, added chunk one. * Improved tests. * Split remote client into Write (old storage) and read. * Queryable client is now SampleAndChunkQueryable. Since we cannot use nice QueryableFunc I moved all config based options to sampleAndChunkQueryableClient to aboid boilerplate. In next commit: Changes for TSDB. Signed-off-by: Bartlomiej Plotka --- cmd/prometheus/main.go | 8 + storage/buffer_test.go | 3 +- storage/fanout.go | 209 +++++++----- storage/fanout/fanout_test.go | 160 ++++++--- storage/fanout_test.go | 51 ++- storage/generic.go | 12 +- storage/interface.go | 43 ++- storage/remote/client.go | 100 ++++-- storage/remote/client_test.go | 3 +- storage/remote/codec.go | 91 ++++- storage/remote/queue_manager.go | 14 +- storage/remote/queue_manager_test.go | 64 ++-- storage/remote/read.go | 325 +++++++++--------- storage/remote/read_test.go | 490 ++++++++++++++++----------- storage/remote/storage.go | 47 ++- storage/remote/write.go | 4 +- storage/series.go | 113 +++++- storage/series_test.go | 4 +- tsdb/db.go | 14 +- web/api/v1/api.go | 232 +++++++------ web/api/v1/api_test.go | 4 +- 21 files changed, 1255 insertions(+), 736 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index d7342499f..342b39f73 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -961,6 +961,14 @@ func (s *readyStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Q return nil, tsdb.ErrNotReady } +// ChunkQuerier implements the Storage interface. +func (s *readyStorage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { + if x := s.get(); x != nil { + return x.ChunkQuerier(ctx, mint, maxt) + } + return nil, tsdb.ErrNotReady +} + // Appender implements the Storage interface. func (s *readyStorage) Appender() storage.Appender { if x := s.get(); x != nil { diff --git a/storage/buffer_test.go b/storage/buffer_test.go index dc69568f1..58e71acd0 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -17,7 +17,6 @@ import ( "math/rand" "testing" - "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/util/testutil" ) @@ -105,7 +104,7 @@ func TestBufferedSeriesIterator(t *testing.T) { testutil.Equals(t, ev, v, "value mismatch") } - it = NewBufferIterator(NewListSeriesIterator([]tsdbutil.Sample{ + it = NewBufferIterator(NewListSeriesIterator(samples{ sample{t: 1, v: 2}, sample{t: 2, v: 3}, sample{t: 3, v: 4}, diff --git a/storage/fanout.go b/storage/fanout.go index 73df5c8c1..35ae7e9a4 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -14,6 +14,7 @@ package storage import ( + "bytes" "container/heap" "context" "sort" @@ -24,7 +25,6 @@ import ( "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -94,12 +94,34 @@ func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) } return nil, errs.Err() } - secondaries = append(secondaries, querier) } return NewMergeQuerier(primary, secondaries, ChainedSeriesMerge), nil } +func (f *fanout) ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQuerier, error) { + primary, err := f.primary.ChunkQuerier(ctx, mint, maxt) + if err != nil { + return nil, err + } + + secondaries := make([]ChunkQuerier, 0, len(f.secondaries)) + for _, storage := range f.secondaries { + querier, err := storage.ChunkQuerier(ctx, mint, maxt) + if err != nil { + // Close already open Queriers, append potential errors to returned error. + errs := tsdb_errors.MultiError{err} + errs.Add(primary.Close()) + for _, q := range secondaries { + errs.Add(q.Close()) + } + return nil, errs.Err() + } + secondaries = append(secondaries, querier) + } + return NewMergeChunkQuerier(primary, secondaries, NewCompactingChunkSeriesMerger(ChainedSeriesMerge)), nil +} + func (f *fanout) Appender() Appender { primary := f.primary.Appender() secondaries := make([]Appender, 0, len(f.secondaries)) @@ -220,7 +242,7 @@ func NewMergeQuerier(primary Querier, secondaries []Querier, mergeFn VerticalSer // // In case of overlaps between the data given by primary + secondaries Selects, merge function will be used. // TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670 -func NewMergeChunkQuerier(primary ChunkQuerier, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergerFunc) ChunkQuerier { +func NewMergeChunkQuerier(primary ChunkQuerier, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier { queriers := make([]genericQuerier, 0, len(secondaries)+1) if primary != nil { queriers = append(queriers, newGenericQuerierFromChunk(primary)) @@ -232,7 +254,7 @@ func NewMergeChunkQuerier(primary ChunkQuerier, secondaries []ChunkQuerier, merg } return &chunkQuerierAdapter{&mergeGenericQuerier{ - mergeFn: (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergerFunc: mergeFn}).Merge, + mergeFn: (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFn}).Merge, queriers: queriers, }} } @@ -270,7 +292,7 @@ func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matche return &lazySeriesSet{create: create(seriesSets, q.mergeFn)} } -func create(seriesSets []genericSeriesSet, mergeFn genericSeriesMergeFunc) func() (genericSeriesSet, bool) { +func create(seriesSets []genericSeriesSet, mergeFunc genericSeriesMergeFunc) func() (genericSeriesSet, bool) { // Returned function gets called with the first call to Next(). return func() (genericSeriesSet, bool) { if len(seriesSets) == 1 { @@ -292,9 +314,9 @@ func create(seriesSets []genericSeriesSet, mergeFn genericSeriesMergeFunc) func( } } set := &genericMergeSeriesSet{ - mergeFn: mergeFn, - sets: seriesSets, - heap: h, + mergeFunc: mergeFunc, + sets: seriesSets, + heap: h, } return set, set.Next() } @@ -361,8 +383,10 @@ func mergeTwoStringSlices(a, b []string) []string { // LabelNames returns all the unique label names present in the block in sorted order. func (q *mergeGenericQuerier) LabelNames() ([]string, Warnings, error) { - labelNamesMap := make(map[string]struct{}) - var warnings Warnings + var ( + labelNamesMap = make(map[string]struct{}) + warnings Warnings + ) for _, querier := range q.queriers { names, wrn, err := querier.LabelNames() if wrn != nil { @@ -403,34 +427,36 @@ func (q *mergeGenericQuerier) Close() error { // It has to handle time-overlapped series as well. type VerticalSeriesMergeFunc func(...Series) Series -// VerticalChunkSeriesMergerFunc returns merged chunk series implementation that merges series with same labels together. -// It has to handle time-overlapped chunk series as well. -type VerticalChunkSeriesMergerFunc func(...ChunkSeries) ChunkSeries - -// NewMergeSeriesSet returns a new SeriesSet that merges results of chkQuerierSeries SeriesSets. -func NewMergeSeriesSet(sets []SeriesSet, merger VerticalSeriesMergeFunc) SeriesSet { +// NewMergeSeriesSet returns a new SeriesSet that merges many SeriesSets together. +func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) SeriesSet { genericSets := make([]genericSeriesSet, 0, len(sets)) for _, s := range sets { genericSets = append(genericSets, &genericSeriesSetAdapter{s}) } - return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: merger}).Merge)} + return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)} } -// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges results of chkQuerierSeries ChunkSeriesSets. -func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, merger VerticalChunkSeriesMergerFunc) ChunkSeriesSet { +// VerticalChunkSeriesMergeFunc returns merged chunk series implementation that merges potentially time-overlapping +// chunk series with the same labels into single ChunkSeries. +// +// NOTE: It's up to implementation how series are vertically merged (if chunks are sorted, re-encoded etc). +type VerticalChunkSeriesMergeFunc func(...ChunkSeries) ChunkSeries + +// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges many SeriesSet together. +func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet { genericSets := make([]genericSeriesSet, 0, len(sets)) for _, s := range sets { genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s}) } - return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergerFunc: merger}).Merge)} + return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)} } // genericMergeSeriesSet implements genericSeriesSet. type genericMergeSeriesSet struct { currentLabels labels.Labels - mergeFn genericSeriesMergeFunc + mergeFunc genericSeriesMergeFunc heap genericSeriesSetHeap sets []genericSeriesSet @@ -441,8 +467,8 @@ type genericMergeSeriesSet struct { // series returned by the series sets when iterating. // Each series set must return its series in labels order, otherwise // merged series set will be incorrect. -// Overlapping cases are merged using provided mergeFn. -func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFn genericSeriesMergeFunc) genericSeriesSet { +// Overlapped situations are merged using provided mergeFunc. +func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMergeFunc) genericSeriesSet { if len(sets) == 1 { return sets[0] } @@ -459,9 +485,9 @@ func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFn genericSeriesMerg } } return &genericMergeSeriesSet{ - mergeFn: mergeFn, - sets: sets, - heap: h, + mergeFunc: mergeFunc, + sets: sets, + heap: h, } } @@ -507,7 +533,7 @@ func (c *genericMergeSeriesSet) At() Labels { for _, seriesSet := range c.currentSets { series = append(series, seriesSet.At()) } - return c.mergeFn(series...) + return c.mergeFunc(series...) } func (c *genericMergeSeriesSet) Err() error { @@ -549,10 +575,16 @@ func (h *genericSeriesSetHeap) Pop() interface{} { return x } -// ChainedSeriesMerge returns single series from many same series by chaining samples together. -// In case of the timestamp overlap, the first overlapped sample is kept and the rest samples with the same timestamps -// are dropped. We expect the same labels for each given series. -// TODO(bwplotka): This has the same logic as tsdb.verticalChainedSeries. Remove this in favor of ChainedSeriesMerge in next PRs. +// ChainedSeriesMerge returns single series from many same, potentially overlapping series by chaining samples together. +// If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same +// timestamp are dropped. +// +// This works the best with replicated series, where data from two series are exactly the same. This does not work well +// with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective +// this never happens. +// +// NOTE: Use this only when you see potentially overlapping series, as this introduces small overhead to handle overlaps +// between series. func ChainedSeriesMerge(s ...Series) Series { if len(s) == 0 { return nil @@ -580,8 +612,9 @@ func (m *chainSeries) Iterator() chunkenc.Iterator { return newChainSampleIterator(iterators) } -// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series. -// If one or more samples overlap, the first one is kept and all others with the same timestamp are dropped. +// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps +// order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same +// timestamp are dropped. type chainSampleIterator struct { iterators []chunkenc.Iterator h samplesIteratorHeap @@ -645,12 +678,13 @@ func (c *chainSampleIterator) Next() bool { } func (c *chainSampleIterator) Err() error { + var errs tsdb_errors.MultiError for _, iter := range c.iterators { if err := iter.Err(); err != nil { - return err + errs.Add(err) } } - return nil + return errs.Err() } type samplesIteratorHeap []chunkenc.Iterator @@ -676,77 +710,76 @@ func (h *samplesIteratorHeap) Pop() interface{} { return x } -// VerticalChunkMergeFunc represents a function that merges multiple time overlapping chunks. -// Passed chunks: -// * have to be sorted by MinTime. -// * have to be part of exactly the same timeseries. -// * have to be populated. -type VerticalChunksMergeFunc func(chks ...chunks.Meta) chunks.Iterator - -type verticalChunkSeriesMerger struct { - verticalChunksMerger VerticalChunksMergeFunc +type compactChunkSeriesMerger struct { + mergeFunc VerticalSeriesMergeFunc labels labels.Labels series []ChunkSeries } -// NewVerticalChunkSeriesMerger returns VerticalChunkSeriesMerger that merges the same chunk series into one or more chunks. -// In case of the chunk overlap, given VerticalChunkMergeFunc will be used. +// NewCompactingChunkSeriesMerger returns VerticalChunkSeriesMergeFunc that merges the same chunk series into single chunk series. +// In case of the chunk overlaps, it compacts those into one or more time-ordered non-overlapping chunks with merged data. +// Samples from overlapped chunks are merged using series vertical merge func. // It expects the same labels for each given series. -func NewVerticalChunkSeriesMerger(chunkMerger VerticalChunksMergeFunc) VerticalChunkSeriesMergerFunc { +// +// NOTE: Use this only when you see potentially overlapping series, as this introduces small overhead to handle overlaps +// between series. +func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalChunkSeriesMergeFunc { return func(s ...ChunkSeries) ChunkSeries { if len(s) == 0 { return nil } - return &verticalChunkSeriesMerger{ - verticalChunksMerger: chunkMerger, - labels: s[0].Labels(), - series: s, + return &compactChunkSeriesMerger{ + mergeFunc: mergeFunc, + labels: s[0].Labels(), + series: s, } } } -func (s *verticalChunkSeriesMerger) Labels() labels.Labels { +func (s *compactChunkSeriesMerger) Labels() labels.Labels { return s.labels } -func (s *verticalChunkSeriesMerger) Iterator() chunks.Iterator { +func (s *compactChunkSeriesMerger) Iterator() chunks.Iterator { iterators := make([]chunks.Iterator, 0, len(s.series)) for _, series := range s.series { iterators = append(iterators, series.Iterator()) } - return &chainChunkIterator{ - overlappedChunksMerger: s.verticalChunksMerger, - iterators: iterators, - h: nil, + return &compactChunkIterator{ + mergeFunc: s.mergeFunc, + labels: s.labels, + iterators: iterators, } } -// chainChunkIterator is responsible to chain chunks from different iterators of same time series. -// If they are time overlapping overlappedChunksMerger will be used. -type chainChunkIterator struct { - overlappedChunksMerger VerticalChunksMergeFunc +// compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries. +// If time-overlapping chunks are found, they are encoded and passed to series merge and encoded again into one bigger chunk. +// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670 +type compactChunkIterator struct { + mergeFunc VerticalSeriesMergeFunc + labels labels.Labels iterators []chunks.Iterator - h chunkIteratorHeap + + h chunkIteratorHeap } -func (c *chainChunkIterator) At() chunks.Meta { +func (c *compactChunkIterator) At() chunks.Meta { if len(c.h) == 0 { - panic("chainChunkIterator.At() called after .Next() returned false.") + panic("compactChunkIterator.At() called after .Next() returned false.") } return c.h[0].At() } -func (c *chainChunkIterator) Next() bool { +func (c *compactChunkIterator) Next() bool { if c.h == nil { for _, iter := range c.iterators { if iter.Next() { heap.Push(&c.h, iter) } } - return len(c.h) > 0 } @@ -754,41 +787,63 @@ func (c *chainChunkIterator) Next() bool { return false } - // Detect the shortest chain of time-overlapped chunks. + // Detect overlaps to compact. + // Be smart about it and deduplicate on the fly if chunks are identical. last := c.At() - var overlapped []chunks.Meta + var overlapped []Series for { iter := heap.Pop(&c.h).(chunks.Iterator) if iter.Next() { heap.Push(&c.h, iter) } - if len(c.h) == 0 { break } + // Get the current oldest chunk by min, then max time. next := c.At() if next.MinTime > last.MaxTime { // No overlap with last one. break } - overlapped = append(overlapped, last) + + if next.MinTime == last.MinTime && + next.MaxTime == last.MaxTime && + bytes.Equal(next.Chunk.Bytes(), last.Chunk.Bytes()) { + // 1:1 duplicates, skip last. + continue + } + + overlapped = append(overlapped, &chunkToSeriesDecoder{ + labels: c.labels, + Meta: last, + }) last = next } - if len(overlapped) > 0 { - heap.Push(&c.h, c.overlappedChunksMerger(append(overlapped, c.At())...)) - return true + + if len(overlapped) == 0 { + return len(c.h) > 0 } - return len(c.h) > 0 + + // Add last, not yet included overlap. + overlapped = append(overlapped, &chunkToSeriesDecoder{ + labels: c.labels, + Meta: c.At(), + }) + + var chkSeries ChunkSeries = &seriesToChunkEncoder{Series: c.mergeFunc(overlapped...)} + heap.Push(&c.h, chkSeries) + return true } -func (c *chainChunkIterator) Err() error { +func (c *compactChunkIterator) Err() error { + var errs tsdb_errors.MultiError for _, iter := range c.iterators { if err := iter.Err(); err != nil { - return err + errs.Add(err) } } - return nil + return errs.Err() } type chunkIteratorHeap []chunks.Iterator diff --git a/storage/fanout/fanout_test.go b/storage/fanout/fanout_test.go index e648296e5..d263297ea 100644 --- a/storage/fanout/fanout_test.go +++ b/storage/fanout/fanout_test.go @@ -15,9 +15,9 @@ package storage import ( "context" - "errors" "testing" + "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -72,31 +72,60 @@ func TestSelectSorted(t *testing.T) { fanoutStorage := storage.NewFanout(nil, priStorage, remoteStorage1, remoteStorage2) - querier, err := fanoutStorage.Querier(context.Background(), 0, 8000) - testutil.Ok(t, err) - defer querier.Close() + t.Run("querier", func(t *testing.T) { + querier, err := fanoutStorage.Querier(context.Background(), 0, 8000) + testutil.Ok(t, err) + defer querier.Close() - matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a") - testutil.Ok(t, err) + matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a") + testutil.Ok(t, err) - seriesSet := querier.Select(true, nil, matcher) + seriesSet := querier.Select(true, nil, matcher) - result := make(map[int64]float64) - var labelsResult labels.Labels - for seriesSet.Next() { - series := seriesSet.At() - seriesLabels := series.Labels() - labelsResult = seriesLabels - iterator := series.Iterator() - for iterator.Next() { - timestamp, value := iterator.At() - result[timestamp] = value + result := make(map[int64]float64) + var labelsResult labels.Labels + for seriesSet.Next() { + series := seriesSet.At() + seriesLabels := series.Labels() + labelsResult = seriesLabels + iterator := series.Iterator() + for iterator.Next() { + timestamp, value := iterator.At() + result[timestamp] = value + } } - } - testutil.Ok(t, seriesSet.Err()) - testutil.Equals(t, labelsResult, outputLabel) - testutil.Equals(t, inputTotalSize, len(result)) + testutil.Equals(t, labelsResult, outputLabel) + testutil.Equals(t, inputTotalSize, len(result)) + }) + t.Run("chunk querier", func(t *testing.T) { + t.Skip("TODO(bwplotka: Unskip when db will implement ChunkQuerier.") + querier, err := fanoutStorage.ChunkQuerier(context.Background(), 0, 8000) + testutil.Ok(t, err) + defer querier.Close() + + matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a") + testutil.Ok(t, err) + + seriesSet := storage.NewSeriesSetFromChunkSeriesSet(querier.Select(true, nil, matcher)) + + result := make(map[int64]float64) + var labelsResult labels.Labels + for seriesSet.Next() { + series := seriesSet.At() + seriesLabels := series.Labels() + labelsResult = seriesLabels + iterator := series.Iterator() + for iterator.Next() { + timestamp, value := iterator.At() + result[timestamp] = value + } + } + + testutil.Ok(t, seriesSet.Err()) + testutil.Equals(t, labelsResult, outputLabel) + testutil.Equals(t, inputTotalSize, len(result)) + }) } func TestFanoutErrors(t *testing.T) { @@ -106,19 +135,19 @@ func TestFanoutErrors(t *testing.T) { cases := []struct { primary storage.Storage secondary storage.Storage - warnings storage.Warnings + warning error err error }{ { primary: workingStorage, secondary: errStorage{}, - warnings: storage.Warnings{errSelect}, + warning: errSelect, err: nil, }, { primary: errStorage{}, secondary: workingStorage, - warnings: nil, + warning: nil, err: errSelect, }, } @@ -126,17 +155,55 @@ func TestFanoutErrors(t *testing.T) { for _, tc := range cases { fanoutStorage := storage.NewFanout(nil, tc.primary, tc.secondary) - querier, err := fanoutStorage.Querier(context.Background(), 0, 8000) - testutil.Ok(t, err) - defer querier.Close() + t.Run("samples", func(t *testing.T) { + querier, err := fanoutStorage.Querier(context.Background(), 0, 8000) + testutil.Ok(t, err) + defer querier.Close() - matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b") - ss := querier.Select(true, nil, matcher) - for ss.Next() { - ss.At() - } - testutil.Equals(t, tc.err, ss.Err()) - testutil.Equals(t, tc.warnings, ss.Warnings()) + matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b") + ss := querier.Select(true, nil, matcher) + + // Exhaust. + for ss.Next() { + ss.At() + } + + if tc.err != nil { + testutil.NotOk(t, ss.Err()) + testutil.Equals(t, tc.err.Error(), ss.Err().Error()) + } + + if tc.warning != nil { + testutil.Assert(t, len(ss.Warnings()) > 0, "warnings expected") + testutil.NotOk(t, ss.Warnings()[0]) + testutil.Equals(t, tc.warning.Error(), ss.Warnings()[0].Error()) + } + }) + t.Run("chunks", func(t *testing.T) { + t.Skip("enable once TestStorage and TSDB implements ChunkQuerier") + querier, err := fanoutStorage.ChunkQuerier(context.Background(), 0, 8000) + testutil.Ok(t, err) + defer querier.Close() + + matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b") + ss := querier.Select(true, nil, matcher) + + // Exhaust. + for ss.Next() { + ss.At() + } + + if tc.err != nil { + testutil.NotOk(t, ss.Err()) + testutil.Equals(t, tc.err.Error(), ss.Err().Error()) + } + + if tc.warning != nil { + testutil.Assert(t, len(ss.Warnings()) > 0, "warnings expected") + testutil.NotOk(t, ss.Warnings()[0]) + testutil.Equals(t, tc.warning.Error(), ss.Warnings()[0].Error()) + } + }) } } @@ -144,23 +211,20 @@ var errSelect = errors.New("select error") type errStorage struct{} +type errQuerier struct{} + func (errStorage) Querier(_ context.Context, _, _ int64) (storage.Querier, error) { return errQuerier{}, nil } -func (errStorage) Appender() storage.Appender { - return nil -} +type errChunkQuerier struct{ errQuerier } -func (errStorage) StartTime() (int64, error) { - return 0, nil +func (errStorage) ChunkQuerier(_ context.Context, _, _ int64) (storage.ChunkQuerier, error) { + return errChunkQuerier{}, nil } - -func (errStorage) Close() error { - return nil -} - -type errQuerier struct{} +func (errStorage) Appender() storage.Appender { return nil } +func (errStorage) StartTime() (int64, error) { return 0, nil } +func (errStorage) Close() error { return nil } func (errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { return storage.ErrSeriesSet(errSelect) @@ -174,6 +238,8 @@ func (errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, errors.New("label names error") } -func (errQuerier) Close() error { - return nil +func (errQuerier) Close() error { return nil } + +func (errChunkQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.ChunkSeriesSet { + return storage.ErrChunkSeriesSet(errSelect) } diff --git a/storage/fanout_test.go b/storage/fanout_test.go index 307e11c61..dc68fd740 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -283,7 +283,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { ), }, { - name: "two queriers, one different series each", + name: "two secondaries, one different series each", chkQuerierSeries: [][]ChunkSeries{{ NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), }, { @@ -295,7 +295,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { ), }, { - name: "two queriers, two not in time order series each", + name: "two secondaries, two not in time order series each", chkQuerierSeries: [][]ChunkSeries{{ NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}), NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}), @@ -319,7 +319,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { ), }, { - name: "five queriers, only two have two not in time order series each", + name: "five secondaries, only two have two not in time order series each", chkQuerierSeries: [][]ChunkSeries{{}, {}, { NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}), NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}), @@ -343,7 +343,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { ), }, { - name: "two queriers, with two not in time order series each, with 3 noop queries and one nil together", + name: "two secondaries, with two not in time order series each, with 3 noop queries and one nil together", chkQuerierSeries: [][]ChunkSeries{{ NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}), NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}), @@ -391,8 +391,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { } qs = append(qs, tc.extraQueriers...) - // TODO(bwplotka): Add case of overlap to check if those are handled well. - merged := NewMergeChunkQuerier(p, qs, NewVerticalChunkSeriesMerger(nil)).Select(false, nil) + merged := NewMergeChunkQuerier(p, qs, NewCompactingChunkSeriesMerger(nil)).Select(false, nil) for merged.Next() { testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true") actualSeries := merged.At() @@ -412,7 +411,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { } type mockQuerier struct { - baseQuerier + LabelQuerier toReturn []Series } @@ -434,7 +433,7 @@ func (m *mockQuerier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Match } type mockChunkQurier struct { - baseQuerier + LabelQuerier toReturn []ChunkSeries } @@ -510,22 +509,22 @@ func TestChainSampleIterator(t *testing.T) { }{ { input: []chunkenc.Iterator{ - NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}}), + NewListSeriesIterator(samples{sample{0, 0}, sample{1, 1}}), }, expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, }, { input: []chunkenc.Iterator{ - NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}}), - NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{3, 3}}), + NewListSeriesIterator(samples{sample{0, 0}, sample{1, 1}}), + NewListSeriesIterator(samples{sample{2, 2}, sample{3, 3}}), }, expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}}, }, { input: []chunkenc.Iterator{ - NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{3, 3}}), - NewListSeriesIterator([]tsdbutil.Sample{sample{1, 1}, sample{4, 4}}), - NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{5, 5}}), + NewListSeriesIterator(samples{sample{0, 0}, sample{3, 3}}), + NewListSeriesIterator(samples{sample{1, 1}, sample{4, 4}}), + NewListSeriesIterator(samples{sample{2, 2}, sample{5, 5}}), }, expected: []tsdbutil.Sample{ sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}}, @@ -533,12 +532,12 @@ func TestChainSampleIterator(t *testing.T) { // Overlap. { input: []chunkenc.Iterator{ - NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}}), - NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{2, 2}}), - NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{3, 3}}), - NewListSeriesIterator([]tsdbutil.Sample{}), - NewListSeriesIterator([]tsdbutil.Sample{}), - NewListSeriesIterator([]tsdbutil.Sample{}), + NewListSeriesIterator(samples{sample{0, 0}, sample{1, 1}}), + NewListSeriesIterator(samples{sample{0, 0}, sample{2, 2}}), + NewListSeriesIterator(samples{sample{2, 2}, sample{3, 3}}), + NewListSeriesIterator(samples{}), + NewListSeriesIterator(samples{}), + NewListSeriesIterator(samples{}), }, expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}}, }, @@ -558,24 +557,24 @@ func TestChainSampleIteratorSeek(t *testing.T) { }{ { input: []chunkenc.Iterator{ - NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}), + NewListSeriesIterator(samples{sample{0, 0}, sample{1, 1}, sample{2, 2}}), }, seek: 1, expected: []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, }, { input: []chunkenc.Iterator{ - NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}}), - NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{3, 3}}), + NewListSeriesIterator(samples{sample{0, 0}, sample{1, 1}}), + NewListSeriesIterator(samples{sample{2, 2}, sample{3, 3}}), }, seek: 2, expected: []tsdbutil.Sample{sample{2, 2}, sample{3, 3}}, }, { input: []chunkenc.Iterator{ - NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{3, 3}}), - NewListSeriesIterator([]tsdbutil.Sample{sample{1, 1}, sample{4, 4}}), - NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{5, 5}}), + NewListSeriesIterator(samples{sample{0, 0}, sample{3, 3}}), + NewListSeriesIterator(samples{sample{1, 1}, sample{4, 4}}), + NewListSeriesIterator(samples{sample{2, 2}, sample{5, 5}}), }, seek: 2, expected: []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}}, diff --git a/storage/generic.go b/storage/generic.go index 36283a40a..daa14555a 100644 --- a/storage/generic.go +++ b/storage/generic.go @@ -19,7 +19,7 @@ package storage import "github.com/prometheus/prometheus/pkg/labels" type genericQuerier interface { - baseQuerier + LabelQuerier Select(bool, *SelectHints, ...*labels.Matcher) genericSeriesSet } @@ -49,7 +49,7 @@ func (a *genericChunkSeriesSetAdapter) At() Labels { } type genericQuerierAdapter struct { - baseQuerier + LabelQuerier // One-of. If both are set, Querier will be used. q Querier @@ -64,11 +64,11 @@ func (q *genericQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matc } func newGenericQuerierFrom(q Querier) genericQuerier { - return &genericQuerierAdapter{baseQuerier: q, q: q} + return &genericQuerierAdapter{LabelQuerier: q, q: q} } func newGenericQuerierFromChunk(cq ChunkQuerier) genericQuerier { - return &genericQuerierAdapter{baseQuerier: cq, cq: cq} + return &genericQuerierAdapter{LabelQuerier: cq, cq: cq} } type querierAdapter struct { @@ -116,7 +116,7 @@ func (a *seriesMergerAdapter) Merge(s ...Labels) Labels { } type chunkSeriesMergerAdapter struct { - VerticalChunkSeriesMergerFunc + VerticalChunkSeriesMergeFunc } func (a *chunkSeriesMergerAdapter) Merge(s ...Labels) Labels { @@ -124,7 +124,7 @@ func (a *chunkSeriesMergerAdapter) Merge(s ...Labels) Labels { for _, ser := range s { buf = append(buf, ser.(ChunkSeries)) } - return a.VerticalChunkSeriesMergerFunc(buf...) + return a.VerticalChunkSeriesMergeFunc(buf...) } type noopGenericSeriesSet struct{} diff --git a/storage/interface.go b/storage/interface.go index dd558bb53..6ebb03842 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -37,11 +37,16 @@ type Appendable interface { Appender() Appender } +// SampleAndChunkQueryable allows retrieving samples as well as encoded samples in form of chunks. +type SampleAndChunkQueryable interface { + Queryable + ChunkQueryable +} + // Storage ingests and manages samples, along with various indexes. All methods // are goroutine-safe. Storage implements storage.SampleAppender. -// TODO(bwplotka): Add ChunkQueryable to Storage in next PR. type Storage interface { - Queryable + SampleAndChunkQueryable Appendable // StartTime returns the oldest timestamp stored in the storage. @@ -60,7 +65,7 @@ type Queryable interface { // Querier provides querying access over time series data of a fixed time range. type Querier interface { - baseQuerier + LabelQuerier // Select returns a set of series that matches the given label matchers. // Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance. @@ -72,12 +77,12 @@ type Querier interface { // Use it when you need to have access to samples in encoded format. type ChunkQueryable interface { // ChunkQuerier returns a new ChunkQuerier on the storage. - ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQuerier, Warnings, error) + ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQuerier, error) } // ChunkQuerier provides querying access over time series data of a fixed time range. type ChunkQuerier interface { - baseQuerier + LabelQuerier // Select returns a set of series that matches the given label matchers. // Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance. @@ -85,7 +90,8 @@ type ChunkQuerier interface { Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) ChunkSeriesSet } -type baseQuerier interface { +// LabelQuerier provides querying access over labels. +type LabelQuerier interface { // LabelValues returns all potential values for a label name. // It is not safe to use the strings beyond the lifefime of the querier. LabelValues(name string) ([]string, Warnings, error) @@ -111,6 +117,7 @@ type SelectHints struct { Range int64 // Range vector selector range in milliseconds. } +// TODO(bwplotka): Move to promql/engine_test.go? // QueryableFunc is an adapter to allow the use of ordinary functions as // Queryables. It follows the idea of http.HandlerFunc. type QueryableFunc func(ctx context.Context, mint, maxt int64) (Querier, error) @@ -169,20 +176,40 @@ func EmptySeriesSet() SeriesSet { } type errSeriesSet struct { - ws Warnings err error } func (s errSeriesSet) Next() bool { return false } func (s errSeriesSet) At() Series { return nil } func (s errSeriesSet) Err() error { return s.err } -func (s errSeriesSet) Warnings() Warnings { return s.ws } +func (s errSeriesSet) Warnings() Warnings { return nil } // ErrSeriesSet returns a series set that wraps an error. func ErrSeriesSet(err error) SeriesSet { return errSeriesSet{err: err} } +var emptyChunkSeriesSet = errChunkSeriesSet{} + +// EmptyChunkSeriesSet returns a chunk series set that's always empty. +func EmptyChunkSeriesSet() ChunkSeriesSet { + return emptyChunkSeriesSet +} + +type errChunkSeriesSet struct { + err error +} + +func (s errChunkSeriesSet) Next() bool { return false } +func (s errChunkSeriesSet) At() ChunkSeries { return nil } +func (s errChunkSeriesSet) Err() error { return s.err } +func (s errChunkSeriesSet) Warnings() Warnings { return nil } + +// ErrChunkSeriesSet returns a chunk series set that wraps an error. +func ErrChunkSeriesSet(err error) ChunkSeriesSet { + return errChunkSeriesSet{err: err} +} + // Series exposes a single time series and allows iterating over samples. type Series interface { Labels diff --git a/storage/remote/client.go b/storage/remote/client.go index 7ff957168..11fc05eb4 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -42,37 +42,86 @@ const maxErrMsgLen = 256 var userAgent = fmt.Sprintf("Prometheus/%s", version.Version) -var remoteReadQueriesTotal = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "read_queries_total", - Help: "The total number of remote read queries.", - }, - []string{remoteName, endpoint, "code"}, +var ( + remoteReadQueriesTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "read_queries_total", + Help: "The total number of remote read queries.", + }, + []string{remoteName, endpoint, "code"}, + ) + remoteReadQueries = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "remote_read_queries", + Help: "The number of in-flight remote read queries.", + }, + []string{remoteName, endpoint}, + ) + remoteReadQueryDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "read_request_duration_seconds", + Help: "Histogram of the latency for remote read requests.", + Buckets: append(prometheus.DefBuckets, 25, 60), + }, + []string{remoteName, endpoint}, + ) ) -// Client allows reading and writing from/to a remote HTTP endpoint. -type Client struct { +func init() { + prometheus.MustRegister(remoteReadQueriesTotal, remoteReadQueries, remoteReadQueryDuration) +} + +// client allows reading and writing from/to a remote HTTP endpoint. +type client struct { remoteName string // Used to differentiate clients in metrics. url *config_util.URL client *http.Client timeout time.Duration + + readQueries prometheus.Gauge + readQueriesTotal *prometheus.CounterVec + readQueriesDuration prometheus.Observer } -// ClientConfig configures a Client. +// ClientConfig configures a client. type ClientConfig struct { URL *config_util.URL Timeout model.Duration HTTPClientConfig config_util.HTTPClientConfig } -func init() { - prometheus.MustRegister(remoteReadQueriesTotal) +// ReadClient uses the SAMPLES method of remote read to read series samples from remote server. +// TODO(bwplotka): Add streamed chunked remote read method as well (https://github.com/prometheus/prometheus/issues/5926). +type ReadClient interface { + Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) } -// NewClient creates a new Client. -func NewClient(remoteName string, conf *ClientConfig) (*Client, error) { +// newReadClient creates a new client for remote read. +func newReadClient(name string, conf *ClientConfig) (ReadClient, error) { + httpClient, err := config_util.NewClientFromConfig(conf.HTTPClientConfig, "remote_storage", false) + if err != nil { + return nil, err + } + + return &client{ + remoteName: name, + url: conf.URL, + client: httpClient, + timeout: time.Duration(conf.Timeout), + readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()), + readQueriesTotal: remoteReadQueriesTotal.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}), + readQueriesDuration: remoteReadQueryDuration.WithLabelValues(name, conf.URL.String()), + }, nil +} + +// NewWriteClient creates a new client for remote write. +func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) { httpClient, err := config_util.NewClientFromConfig(conf.HTTPClientConfig, "remote_storage", false) if err != nil { return nil, err @@ -83,8 +132,8 @@ func NewClient(remoteName string, conf *ClientConfig) (*Client, error) { RoundTripper: t, } - return &Client{ - remoteName: remoteName, + return &client{ + remoteName: name, url: conf.URL, client: httpClient, timeout: time.Duration(conf.Timeout), @@ -97,7 +146,7 @@ type recoverableError struct { // Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled // and encoded bytes from codec.go. -func (c *Client) Store(ctx context.Context, req []byte) error { +func (c *client) Store(ctx context.Context, req []byte) error { httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(req)) if err != nil { // Errors from NewRequest are from unparsable URLs, so are not @@ -150,17 +199,20 @@ func (c *Client) Store(ctx context.Context, req []byte) error { } // Name uniquely identifies the client. -func (c Client) Name() string { +func (c client) Name() string { return c.remoteName } // Endpoint is the remote read or write endpoint. -func (c Client) Endpoint() string { +func (c client) Endpoint() string { return c.url.String() } // Read reads from a remote endpoint. -func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) { +func (c *client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) { + c.readQueries.Inc() + defer c.readQueries.Dec() + req := &prompb.ReadRequest{ // TODO: Support batching multiple queries into one read request, // as the protobuf interface allows for it. @@ -200,6 +252,7 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe defer ht.Finish() } + start := time.Now() httpResp, err := c.client.Do(httpReq) if err != nil { return nil, errors.Wrap(err, "error sending request") @@ -208,9 +261,8 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe io.Copy(ioutil.Discard, httpResp.Body) httpResp.Body.Close() }() - - remoteReadTotalCounter := remoteReadQueriesTotal.WithLabelValues(c.remoteName, c.url.String(), strconv.Itoa(httpResp.StatusCode)) - remoteReadTotalCounter.Inc() + c.readQueriesDuration.Observe(time.Since(start).Seconds()) + c.readQueriesTotal.WithLabelValues(strconv.Itoa(httpResp.StatusCode)).Inc() compressed, err = ioutil.ReadAll(httpResp.Body) if err != nil { diff --git a/storage/remote/client_test.go b/storage/remote/client_test.go index e604ab65b..59e2128b4 100644 --- a/storage/remote/client_test.go +++ b/storage/remote/client_test.go @@ -25,7 +25,6 @@ import ( "github.com/pkg/errors" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/util/testutil" ) @@ -71,7 +70,7 @@ func TestStoreHTTPErrorHandling(t *testing.T) { hash, err := toHash(conf) testutil.Ok(t, err) - c, err := NewClient(hash, conf) + c, err := NewWriteClient(hash, conf) testutil.Ok(t, err) err = c.Store(context.Background(), []byte{}) diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 6985a5f6a..3181db120 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -138,25 +138,18 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, Samples: samples, }) } - if err := ss.Err(); err != nil { - return nil, ss.Warnings(), err - } - return resp, ss.Warnings(), nil + return resp, ss.Warnings(), ss.Err() } // FromQueryResult unpacks and sorts a QueryResult proto. func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet { series := make([]storage.Series, 0, len(res.Timeseries)) for _, ts := range res.Timeseries { - labels := labelProtosToLabels(ts.Labels) - if err := validateLabelsAndMetricName(labels); err != nil { + lbls := labelProtosToLabels(ts.Labels) + if err := validateLabelsAndMetricName(lbls); err != nil { return errSeriesSet{err: err} } - - series = append(series, &concreteSeries{ - labels: labels, - samples: ts.Samples, - }) + series = append(series, &concreteSeries{labels: lbls, samples: ts.Samples}) } if sortSeries { @@ -187,9 +180,8 @@ func NegotiateResponseType(accepted []prompb.ReadRequest_ResponseType) (prompb.R return 0, errors.Errorf("server does not support any of the requested response types: %v; supported: %v", accepted, supported) } -// StreamChunkedReadResponses iterates over series, builds chunks and streams those to the caller. -// TODO(bwplotka): Encode only what's needed. Fetch the encoded series from blocks instead of re-encoding everything. -func StreamChunkedReadResponses( +// TODO(bwlpotka): Remove when tsdb will support ChunkQuerier. +func DeprecatedStreamChunkedReadResponses( stream io.Writer, queryIndex int64, ss storage.SeriesSet, @@ -315,6 +307,77 @@ func encodeChunks(iter chunkenc.Iterator, chks []prompb.Chunk, frameBytesLeft in return chks, nil } +// StreamChunkedReadResponses iterates over series, builds chunks and streams those to the caller. +// It expects Series set with populated chunks. +func StreamChunkedReadResponses( + stream io.Writer, + queryIndex int64, + ss storage.ChunkSeriesSet, + sortedExternalLabels []prompb.Label, + maxBytesInFrame int, +) (storage.Warnings, error) { + var ( + chks []prompb.Chunk + lbls []prompb.Label + ) + + for ss.Next() { + series := ss.At() + iter := series.Iterator() + lbls = MergeLabels(labelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels) + + frameBytesLeft := maxBytesInFrame + for _, lbl := range lbls { + frameBytesLeft -= lbl.Size() + } + + isNext := iter.Next() + + // Send at most one series per frame; series may be split over multiple frames according to maxBytesInFrame. + for isNext { + chk := iter.At() + + if chk.Chunk == nil { + return ss.Warnings(), errors.Errorf("StreamChunkedReadResponses: found not populated chunk returned by SeriesSet at ref: %v", chk.Ref) + } + + // Cut the chunk. + chks = append(chks, prompb.Chunk{ + MinTimeMs: chk.MinTime, + MaxTimeMs: chk.MaxTime, + Type: prompb.Chunk_Encoding(chk.Chunk.Encoding()), + Data: chk.Chunk.Bytes(), + }) + frameBytesLeft -= chks[len(chks)-1].Size() + + // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size. + isNext = iter.Next() + if frameBytesLeft > 0 && isNext { + continue + } + + b, err := proto.Marshal(&prompb.ChunkedReadResponse{ + ChunkedSeries: []*prompb.ChunkedSeries{ + {Labels: lbls, Chunks: chks}, + }, + QueryIndex: queryIndex, + }) + if err != nil { + return ss.Warnings(), errors.Wrap(err, "marshal ChunkedReadResponse") + } + + if _, err := stream.Write(b); err != nil { + return ss.Warnings(), errors.Wrap(err, "write to stream") + } + chks = chks[:0] + } + if err := iter.Err(); err != nil { + return ss.Warnings(), err + } + } + return ss.Warnings(), ss.Err() +} + // MergeLabels merges two sets of sorted proto labels, preferring those in // primary to those in secondary when there is an overlap. func MergeLabels(primary, secondary []prompb.Label) []prompb.Label { diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index befa9a7b7..5fc66439b 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -217,9 +217,9 @@ func (m *queueManagerMetrics) unregister() { } } -// StorageClient defines an interface for sending a batch of samples to an +// WriteClient defines an interface for sending a batch of samples to an // external timeseries database. -type StorageClient interface { +type WriteClient interface { // Store stores the given samples in the remote storage. Store(context.Context, []byte) error // Name uniquely identifies the remote storage. @@ -229,7 +229,7 @@ type StorageClient interface { } // QueueManager manages a queue of samples to be sent to the Storage -// indicated by the provided StorageClient. Implements writeTo interface +// indicated by the provided WriteClient. Implements writeTo interface // used by WAL Watcher. type QueueManager struct { // https://golang.org/pkg/sync/atomic/#pkg-note-BUG @@ -243,7 +243,7 @@ type QueueManager struct { watcher *wal.Watcher clientMtx sync.RWMutex - storeClient StorageClient + storeClient WriteClient seriesMtx sync.Mutex seriesLabels map[uint64]labels.Labels @@ -272,7 +272,7 @@ func NewQueueManager( cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, - client StorageClient, + client WriteClient, flushDeadline time.Duration, ) *QueueManager { if logger == nil { @@ -440,13 +440,13 @@ func (t *QueueManager) SeriesReset(index int) { // SetClient updates the client used by a queue. Used when only client specific // fields are updated to avoid restarting the queue. -func (t *QueueManager) SetClient(c StorageClient) { +func (t *QueueManager) SetClient(c WriteClient) { t.clientMtx.Lock() t.storeClient = c t.clientMtx.Unlock() } -func (t *QueueManager) client() StorageClient { +func (t *QueueManager) client() WriteClient { t.clientMtx.RLock() defer t.clientMtx.RUnlock() return t.storeClient diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 323ee7e5e..39861a16f 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -51,7 +51,7 @@ func TestSampleDelivery(t *testing.T) { n := config.DefaultQueueConfig.MaxSamplesPerSend * 2 samples, series := createTimeseries(n, n) - c := NewTestStorageClient() + c := NewTestWriteClient() c.expectSamples(samples[:len(samples)/2], series) cfg := config.DefaultQueueConfig @@ -81,7 +81,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { // Let's send one less sample than batch size, and wait the timeout duration n := 9 samples, series := createTimeseries(n, n) - c := NewTestStorageClient() + c := NewTestWriteClient() cfg := config.DefaultQueueConfig cfg.MaxShards = 1 @@ -125,7 +125,7 @@ func TestSampleDeliveryOrder(t *testing.T) { }) } - c := NewTestStorageClient() + c := NewTestWriteClient() c.expectSamples(samples, series) dir, err := ioutil.TempDir("", "TestSampleDeliveryOrder") @@ -145,7 +145,7 @@ func TestSampleDeliveryOrder(t *testing.T) { func TestShutdown(t *testing.T) { deadline := 1 * time.Second - c := NewTestBlockedStorageClient() + c := NewTestBlockedWriteClient() dir, err := ioutil.TempDir("", "TestShutdown") testutil.Ok(t, err) @@ -181,7 +181,7 @@ func TestShutdown(t *testing.T) { } func TestSeriesReset(t *testing.T) { - c := NewTestBlockedStorageClient() + c := NewTestBlockedWriteClient() deadline := 5 * time.Second numSegments := 4 numSeries := 25 @@ -210,7 +210,7 @@ func TestReshard(t *testing.T) { nSamples := config.DefaultQueueConfig.Capacity * size samples, series := createTimeseries(nSamples, nSeries) - c := NewTestStorageClient() + c := NewTestWriteClient() c.expectSamples(samples, series) cfg := config.DefaultQueueConfig @@ -245,7 +245,7 @@ func TestReshard(t *testing.T) { } func TestReshardRaceWithStop(t *testing.T) { - c := NewTestStorageClient() + c := NewTestWriteClient() var m *QueueManager h := sync.Mutex{} @@ -271,7 +271,7 @@ func TestReshardRaceWithStop(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") - c := NewTestStorageClient() + c := NewTestWriteClient() m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m.Start() @@ -319,7 +319,7 @@ func TestShouldReshard(t *testing.T) { } for _, c := range cases { metrics := newQueueManagerMetrics(nil, "", "") - client := NewTestStorageClient() + client := NewTestWriteClient() m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline) m.numShards = c.startingShards m.samplesIn.incr(c.samplesIn) @@ -367,7 +367,7 @@ func getSeriesNameFromRef(r record.RefSeries) string { return "" } -type TestStorageClient struct { +type TestWriteClient struct { receivedSamples map[string][]prompb.Sample expectedSamples map[string][]prompb.Sample withWaitGroup bool @@ -376,15 +376,15 @@ type TestStorageClient struct { buf []byte } -func NewTestStorageClient() *TestStorageClient { - return &TestStorageClient{ +func NewTestWriteClient() *TestWriteClient { + return &TestWriteClient{ withWaitGroup: true, receivedSamples: map[string][]prompb.Sample{}, expectedSamples: map[string][]prompb.Sample{}, } } -func (c *TestStorageClient) expectSamples(ss []record.RefSample, series []record.RefSeries) { +func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.RefSeries) { if !c.withWaitGroup { return } @@ -404,7 +404,7 @@ func (c *TestStorageClient) expectSamples(ss []record.RefSample, series []record c.wg.Add(len(ss)) } -func (c *TestStorageClient) waitForExpectedSamples(tb testing.TB) { +func (c *TestWriteClient) waitForExpectedSamples(tb testing.TB) { if !c.withWaitGroup { return } @@ -418,7 +418,7 @@ func (c *TestStorageClient) waitForExpectedSamples(tb testing.TB) { } } -func (c *TestStorageClient) expectSampleCount(numSamples int) { +func (c *TestWriteClient) expectSampleCount(numSamples int) { if !c.withWaitGroup { return } @@ -427,14 +427,14 @@ func (c *TestStorageClient) expectSampleCount(numSamples int) { c.wg.Add(numSamples) } -func (c *TestStorageClient) waitForExpectedSampleCount() { +func (c *TestWriteClient) waitForExpectedSampleCount() { if !c.withWaitGroup { return } c.wg.Wait() } -func (c *TestStorageClient) Store(_ context.Context, req []byte) error { +func (c *TestWriteClient) Store(_ context.Context, req []byte) error { c.mtx.Lock() defer c.mtx.Unlock() // nil buffers are ok for snappy, ignore cast error. @@ -472,41 +472,41 @@ func (c *TestStorageClient) Store(_ context.Context, req []byte) error { return nil } -func (c *TestStorageClient) Name() string { - return "teststorageclient" +func (c *TestWriteClient) Name() string { + return "testwriteclient" } -func (c *TestStorageClient) Endpoint() string { +func (c *TestWriteClient) Endpoint() string { return "http://test-remote.com/1234" } -// TestBlockingStorageClient is a queue_manager StorageClient which will block +// TestBlockingWriteClient is a queue_manager WriteClient which will block // on any calls to Store(), until the request's Context is cancelled, at which // point the `numCalls` property will contain a count of how many times Store() // was called. -type TestBlockingStorageClient struct { +type TestBlockingWriteClient struct { numCalls uint64 } -func NewTestBlockedStorageClient() *TestBlockingStorageClient { - return &TestBlockingStorageClient{} +func NewTestBlockedWriteClient() *TestBlockingWriteClient { + return &TestBlockingWriteClient{} } -func (c *TestBlockingStorageClient) Store(ctx context.Context, _ []byte) error { +func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte) error { atomic.AddUint64(&c.numCalls, 1) <-ctx.Done() return nil } -func (c *TestBlockingStorageClient) NumCalls() uint64 { +func (c *TestBlockingWriteClient) NumCalls() uint64 { return atomic.LoadUint64(&c.numCalls) } -func (c *TestBlockingStorageClient) Name() string { - return "testblockingstorageclient" +func (c *TestBlockingWriteClient) Name() string { + return "testblockingwriteclient" } -func (c *TestBlockingStorageClient) Endpoint() string { +func (c *TestBlockingWriteClient) Endpoint() string { return "http://test-remote-blocking.com/1234" } @@ -516,7 +516,7 @@ func BenchmarkSampleDelivery(b *testing.B) { n := config.DefaultQueueConfig.MaxSamplesPerSend * 10 samples, series := createTimeseries(n, n) - c := NewTestStorageClient() + c := NewTestWriteClient() cfg := config.DefaultQueueConfig cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) @@ -568,7 +568,7 @@ func BenchmarkStartup(b *testing.B) { for n := 0; n < b.N; n++ { metrics := newQueueManagerMetrics(nil, "", "") - c := NewTestBlockedStorageClient() + c := NewTestBlockedWriteClient() m := NewQueueManager(metrics, nil, nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) @@ -611,7 +611,7 @@ func TestProcessExternalLabels(t *testing.T) { } func TestCalculateDesiredShards(t *testing.T) { - c := NewTestStorageClient() + c := NewTestWriteClient() cfg := config.DefaultQueueConfig dir, err := ioutil.TempDir("", "TestCalculateDesiredShards") diff --git a/storage/remote/read.go b/storage/remote/read.go index 0b53ed7e7..39822a676 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -15,190 +15,162 @@ package remote import ( "context" - "fmt" - "time" - "github.com/prometheus/client_golang/prometheus" + "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" ) -var remoteReadQueries = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "remote_read_queries", - Help: "The number of in-flight remote read queries.", - }, - []string{remoteName, endpoint}, -) - -var remoteReadQueriesHistogram = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "read_request_duration_seconds", - Help: "Histogram of the latency for remote read requests.", - Buckets: append(prometheus.DefBuckets, 25, 60), - }, - []string{remoteName, endpoint}, -) - -func init() { - prometheus.MustRegister(remoteReadQueries, remoteReadQueriesHistogram) +type sampleAndChunkQueryableClient struct { + client ReadClient + externalLabels labels.Labels + requiredMatchers []*labels.Matcher + readRecent bool + callback startTimeCallback } -// QueryableClient returns a storage.Queryable which queries the given -// Client to select series sets. -func QueryableClient(c *Client) storage.Queryable { - remoteReadQueries.WithLabelValues(c.remoteName, c.url.String()) - remoteReadQueriesHistogram.WithLabelValues(c.remoteName, c.url.String()) +// NewSampleAndChunkQueryableClient returns a storage.SampleAndChunkQueryable which queries the given client to select series sets. +func NewSampleAndChunkQueryableClient( + c ReadClient, + externalLabels labels.Labels, + requiredMatchers []*labels.Matcher, + readRecent bool, + callback startTimeCallback, +) storage.SampleAndChunkQueryable { + return &sampleAndChunkQueryableClient{ + client: c, - return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return &querier{ - ctx: ctx, - mint: mint, - maxt: maxt, - client: c, - }, nil - }) + externalLabels: externalLabels, + requiredMatchers: requiredMatchers, + readRecent: readRecent, + callback: callback, + } +} + +func (c *sampleAndChunkQueryableClient) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + q := &querier{ + ctx: ctx, + mint: mint, + maxt: maxt, + client: c.client, + externalLabels: c.externalLabels, + requiredMatchers: c.requiredMatchers, + } + if c.readRecent { + return q, nil + } + + var ( + noop bool + err error + ) + q.maxt, noop, err = c.preferLocalStorage(mint, maxt) + if err != nil { + return nil, err + } + if noop { + return storage.NoopQuerier(), nil + } + return q, nil +} + +func (c *sampleAndChunkQueryableClient) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { + cq := &chunkQuerier{ + querier: querier{ + ctx: ctx, + mint: mint, + maxt: maxt, + client: c.client, + externalLabels: c.externalLabels, + requiredMatchers: c.requiredMatchers, + }, + } + if c.readRecent { + return cq, nil + } + + var ( + noop bool + err error + ) + cq.querier.maxt, noop, err = c.preferLocalStorage(mint, maxt) + if err != nil { + return nil, err + } + if noop { + return storage.NoopChunkedQuerier(), nil + } + return cq, nil +} + +// preferLocalStorage returns noop if requested timeframe can be answered completely by the local TSDB, and +// reduces maxt if the timeframe can be partially answered by TSDB. +func (c *sampleAndChunkQueryableClient) preferLocalStorage(mint, maxt int64) (cmaxt int64, noop bool, err error) { + localStartTime, err := c.callback() + if err != nil { + return 0, false, err + } + cmaxt = maxt + + // Avoid queries whose time range is later than the first timestamp in local DB. + if mint > localStartTime { + return 0, true, nil + } + // Query only samples older than the first timestamp in local DB. + if maxt > localStartTime { + cmaxt = localStartTime + } + return cmaxt, false, nil } -// querier is an adapter to make a Client usable as a storage.Querier. type querier struct { ctx context.Context mint, maxt int64 - client *Client -} - -// Select implements storage.Querier and uses the given matchers to read series sets from the Client. -func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { - query, err := ToQuery(q.mint, q.maxt, matchers, hints) - if err != nil { - return storage.ErrSeriesSet(err) - } - - remoteReadGauge := remoteReadQueries.WithLabelValues(q.client.remoteName, q.client.url.String()) - remoteReadGauge.Inc() - defer remoteReadGauge.Dec() - - remoteReadQueriesHistogram := remoteReadQueriesHistogram.WithLabelValues(q.client.remoteName, q.client.url.String()) - remoteReadQueriesHistogram.Observe(time.Since(time.Now()).Seconds()) - - res, err := q.client.Read(q.ctx, query) - if err != nil { - return storage.ErrSeriesSet(fmt.Errorf("remote_read: %v", err)) - } - - return FromQueryResult(sortSeries, res) -} - -// LabelValues implements storage.Querier and is a noop. -func (q *querier) LabelValues(string) ([]string, storage.Warnings, error) { - // TODO implement? - return nil, nil, nil -} - -// LabelNames implements storage.Querier and is a noop. -func (q *querier) LabelNames() ([]string, storage.Warnings, error) { - // TODO implement? - return nil, nil, nil -} - -// Close implements storage.Querier and is a noop. -func (q *querier) Close() error { - return nil -} - -// ExternalLabelsHandler returns a storage.Queryable which creates a -// externalLabelsQuerier. -func ExternalLabelsHandler(next storage.Queryable, externalLabels labels.Labels) storage.Queryable { - return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - q, err := next.Querier(ctx, mint, maxt) - if err != nil { - return nil, err - } - return &externalLabelsQuerier{Querier: q, externalLabels: externalLabels}, nil - }) -} - -// externalLabelsQuerier is a querier which ensures that Select() results match -// the configured external labels. -type externalLabelsQuerier struct { - storage.Querier - - externalLabels labels.Labels -} - -// Select adds equality matchers for all external labels to the list of matchers -// before calling the wrapped storage.Queryable. The added external labels are -// removed from the returned series sets. -func (q externalLabelsQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { - m, added := q.addExternalLabels(matchers) - return newSeriesSetFilter(q.Querier.Select(sortSeries, hints, m...), added) -} - -// PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier -// if requested timeframe can be answered completely by the local TSDB, and -// reduces maxt if the timeframe can be partially answered by TSDB. -func PreferLocalStorageFilter(next storage.Queryable, cb startTimeCallback) storage.Queryable { - return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - localStartTime, err := cb() - if err != nil { - return nil, err - } - cmaxt := maxt - // Avoid queries whose timerange is later than the first timestamp in local DB. - if mint > localStartTime { - return storage.NoopQuerier(), nil - } - // Query only samples older than the first timestamp in local DB. - if maxt > localStartTime { - cmaxt = localStartTime - } - return next.Querier(ctx, mint, cmaxt) - }) -} - -// RequiredMatchersFilter returns a storage.Queryable which creates a -// requiredMatchersQuerier. -func RequiredMatchersFilter(next storage.Queryable, required []*labels.Matcher) storage.Queryable { - return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - q, err := next.Querier(ctx, mint, maxt) - if err != nil { - return nil, err - } - return &requiredMatchersQuerier{Querier: q, requiredMatchers: required}, nil - }) -} - -// requiredMatchersQuerier wraps a storage.Querier and requires Select() calls -// to match the given labelSet. -type requiredMatchersQuerier struct { - storage.Querier + client ReadClient + // Derived from configuration. + externalLabels labels.Labels requiredMatchers []*labels.Matcher } -// Select returns a NoopSeriesSet if the given matchers don't match the label -// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier. -func (q requiredMatchersQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { - ms := q.requiredMatchers - for _, m := range matchers { - for i, r := range ms { - if m.Type == labels.MatchEqual && m.Name == r.Name && m.Value == r.Value { - ms = append(ms[:i], ms[i+1:]...) +// Select implements storage.Querier and uses the given matchers to read series sets from the client. +// Select also adds equality matchers for all external labels to the list of matchers before calling remote endpoint. +// The added external labels are removed from the returned series sets. +// +// If requiredMatchers are given, select returns a NoopSeriesSet if the given matchers don't match the label set of the +// requiredMatchers. Otherwise it'll just call remote endpoint. +func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + if len(q.requiredMatchers) > 0 { + // Copy to not modify slice configured by user. + requiredMatchers := append([]*labels.Matcher{}, q.requiredMatchers...) + for _, m := range matchers { + for i, r := range requiredMatchers { + if m.Type == labels.MatchEqual && m.Name == r.Name && m.Value == r.Value { + // Requirement matched. + requiredMatchers = append(requiredMatchers[:i], requiredMatchers[i+1:]...) + break + } + } + if len(requiredMatchers) == 0 { break } } - if len(ms) == 0 { - break + if len(requiredMatchers) > 0 { + return storage.NoopSeriesSet() } } - if len(ms) > 0 { - return storage.NoopSeriesSet() + + m, added := q.addExternalLabels(matchers) + query, err := ToQuery(q.mint, q.maxt, m, hints) + if err != nil { + return storage.ErrSeriesSet(errors.Wrap(err, "toQuery")) } - return q.Querier.Select(sortSeries, hints, matchers...) + + res, err := q.client.Read(q.ctx, query) + if err != nil { + return storage.ErrSeriesSet(errors.Wrap(err, "remote_read")) + } + return newSeriesSetFilter(FromQueryResult(sortSeries, res), added) } // addExternalLabels adds matchers for each external label. External labels @@ -207,7 +179,7 @@ func (q requiredMatchersQuerier) Select(sortSeries bool, hints *storage.SelectHi // We return the new set of matchers, along with a map of labels for which // matchers were added, so that these can later be removed from the result // time series again. -func (q externalLabelsQuerier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, labels.Labels) { +func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, labels.Labels) { el := make(labels.Labels, len(q.externalLabels)) copy(el, q.externalLabels) @@ -232,6 +204,35 @@ func (q externalLabelsQuerier) addExternalLabels(ms []*labels.Matcher) ([]*label return ms, el } +// LabelValues implements storage.Querier and is a noop. +func (q *querier) LabelValues(string) ([]string, storage.Warnings, error) { + // TODO: Implement: https://github.com/prometheus/prometheus/issues/3351 + return nil, nil, errors.New("not implemented") +} + +// LabelNames implements storage.Querier and is a noop. +func (q *querier) LabelNames() ([]string, storage.Warnings, error) { + // TODO: Implement: https://github.com/prometheus/prometheus/issues/3351 + return nil, nil, errors.New("not implemented") +} + +// Close implements storage.Querier and is a noop. +func (q *querier) Close() error { + return nil +} + +// chunkQuerier is an adapter to make a client usable as a storage.ChunkQuerier. +type chunkQuerier struct { + querier +} + +// Select implements storage.ChunkQuerier and uses the given matchers to read chunk series sets from the client. +// It uses remote.querier.Select so it supports external labels and required matchers if specified. +func (q *chunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet { + // TODO(bwplotka) Support remote read chunked and allow returning chunks directly (TODO ticket). + return storage.NewSeriesSetToChunkSet(q.querier.Select(sortSeries, hints, matchers...)) +} + func newSeriesSetFilter(ss storage.SeriesSet, toFilter labels.Labels) storage.SeriesSet { return &seriesSetFilter{ SeriesSet: ss, diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 6a9cdd712..73ac6f08b 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -22,6 +22,7 @@ import ( "sort" "testing" + "github.com/pkg/errors" config_util "github.com/prometheus/common/config" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" @@ -40,7 +41,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) { URL: &config_util.URL{ URL: &url.URL{ Scheme: "http", - Host: "localhost", + Host: "localhost1", }, }, } @@ -49,7 +50,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) { URL: &config_util.URL{ URL: &url.URL{ Scheme: "http", - Host: "localhost", + Host: "localhost2", }, }, } @@ -57,7 +58,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) { URL: &config_util.URL{ URL: &url.URL{ Scheme: "http", - Host: "localhost", + Host: "localhost3", }, }, } @@ -92,35 +93,17 @@ func TestNoDuplicateReadConfigs(t *testing.T) { } for _, tc := range cases { - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline) - conf := &config.Config{ - GlobalConfig: config.DefaultGlobalConfig, - RemoteReadConfigs: tc.cfgs, - } - err := s.ApplyConfig(conf) - gotError := err != nil - testutil.Equals(t, tc.err, gotError) - - err = s.Close() - testutil.Ok(t, err) - } -} - -func TestExternalLabelsQuerierSelect(t *testing.T) { - matchers := []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "job", "api-server"), - } - q := &externalLabelsQuerier{ - Querier: mockQuerier{}, - externalLabels: labels.Labels{ - {Name: "region", Value: "europe"}, - }, - } - want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels) - have := q.Select(false, nil, matchers...) - - if !reflect.DeepEqual(want, have) { - t.Errorf("expected series set %+v, got %+v", want, have) + t.Run("", func(t *testing.T) { + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline) + conf := &config.Config{ + GlobalConfig: config.DefaultGlobalConfig, + RemoteReadConfigs: tc.cfgs, + } + err := s.ApplyConfig(conf) + gotError := err != nil + testutil.Equals(t, tc.err, gotError) + testutil.Ok(t, s.Close()) + }) } } @@ -179,7 +162,7 @@ func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) { } for i, test := range tests { - q := &externalLabelsQuerier{Querier: mockQuerier{}, externalLabels: test.el} + q := &querier{externalLabels: test.el} matchers, added := q.addExternalLabels(test.inMatchers) sort.Slice(test.outMatchers, func(i, j int) bool { return test.outMatchers[i].Name < test.outMatchers[j].Name }) @@ -225,180 +208,307 @@ func TestSeriesSetFilter(t *testing.T) { } } -type mockQuerier struct { - ctx context.Context - mint, maxt int64 - - storage.Querier +type mockedRemoteClient struct { + got *prompb.Query + store []*prompb.TimeSeries } -type mockSeriesSet struct { - storage.SeriesSet -} - -func (mockQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { - return mockSeriesSet{} -} - -func TestPreferLocalStorageFilter(t *testing.T) { - ctx := context.Background() - - tests := []struct { - localStartTime int64 - mint int64 - maxt int64 - querier storage.Querier - }{ - { - localStartTime: int64(100), - mint: int64(0), - maxt: int64(50), - querier: mockQuerier{ctx: ctx, mint: 0, maxt: 50}, - }, - { - localStartTime: int64(20), - mint: int64(0), - maxt: int64(50), - querier: mockQuerier{ctx: ctx, mint: 0, maxt: 20}, - }, - { - localStartTime: int64(20), - mint: int64(30), - maxt: int64(50), - querier: storage.NoopQuerier(), - }, +func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query) (*prompb.QueryResult, error) { + if c.got != nil { + return nil, errors.Errorf("expected only one call to remote client got: %v", query) } + c.got = query - for i, test := range tests { - f := PreferLocalStorageFilter( - storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return mockQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil - }), - func() (int64, error) { return test.localStartTime, nil }, - ) - - q, err := f.Querier(ctx, test.mint, test.maxt) - if err != nil { - t.Fatal(err) - } - - if test.querier != q { - t.Errorf("%d. expected querier %+v, got %+v", i, test.querier, q) - } - } -} - -func TestRequiredMatchersFilter(t *testing.T) { - ctx := context.Background() - - f := RequiredMatchersFilter( - storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return mockQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil - }), - []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "special", "label")}, - ) - - want := &requiredMatchersQuerier{ - Querier: mockQuerier{ctx: ctx, mint: 0, maxt: 50}, - requiredMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "special", "label")}, - } - have, err := f.Querier(ctx, 0, 50) + matchers, err := FromLabelMatchers(query.Matchers) if err != nil { - t.Fatal(err) + return nil, err } - if !reflect.DeepEqual(want, have) { - t.Errorf("expected querier %+v, got %+v", want, have) + q := &prompb.QueryResult{} + for _, s := range c.store { + l := labelProtosToLabels(s.Labels) + var notMatch bool + + for _, m := range matchers { + if v := l.Get(m.Name); v != "" { + if !m.Matches(v) { + notMatch = true + break + } + } + } + + if !notMatch { + q.Timeseries = append(q.Timeseries, &prompb.TimeSeries{Labels: s.Labels}) + } } + return q, nil } -func TestRequiredLabelsQuerierSelect(t *testing.T) { - tests := []struct { - requiredMatchers []*labels.Matcher +func (c *mockedRemoteClient) reset() { + c.got = nil +} + +// NOTE: We don't need to test ChunkQuerier as it's uses querier for all operations anyway. +func TestSampleAndChunkQueryableClient(t *testing.T) { + m := &mockedRemoteClient{ + // Samples does not matter for below tests. + store: []*prompb.TimeSeries{ + {Labels: []prompb.Label{{Name: "a", Value: "b"}}}, + {Labels: []prompb.Label{{Name: "a", Value: "b3"}, {Name: "region", Value: "us"}}}, + {Labels: []prompb.Label{{Name: "a", Value: "b2"}, {Name: "region", Value: "europe"}}}, + }, + } + + for _, tc := range []struct { + name string matchers []*labels.Matcher - seriesSet storage.SeriesSet + mint, maxt int64 + externalLabels labels.Labels + requiredMatchers []*labels.Matcher + readRecent bool + callback startTimeCallback + + expectedQuery *prompb.Query + expectedSeries []labels.Labels }{ { - requiredMatchers: []*labels.Matcher{}, + name: "empty", + mint: 1, maxt: 2, matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "special", "label"), + labels.MustNewMatcher(labels.MatchNotEqual, "a", "something"), }, - seriesSet: mockSeriesSet{}, - }, - { - requiredMatchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "special", "label"), - }, - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "special", "label"), - }, - seriesSet: mockSeriesSet{}, - }, - { - requiredMatchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "special", "label"), - }, - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchRegexp, "special", "label"), - }, - seriesSet: storage.NoopSeriesSet(), - }, - { - requiredMatchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "special", "label"), - }, - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "special", "different"), - }, - seriesSet: storage.NoopSeriesSet(), - }, - { - requiredMatchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "special", "label"), - }, - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "special", "label"), - labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), - }, - seriesSet: mockSeriesSet{}, - }, - { - requiredMatchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "special", "label"), - labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), - }, - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "special", "label"), - labels.MustNewMatcher(labels.MatchEqual, "foo", "baz"), - }, - seriesSet: storage.NoopSeriesSet(), - }, - { - requiredMatchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "special", "label"), - labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), - }, - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "special", "label"), - labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), - }, - seriesSet: mockSeriesSet{}, - }, - } + readRecent: true, - for i, test := range tests { - q := &requiredMatchersQuerier{ - Querier: mockQuerier{}, - requiredMatchers: test.requiredMatchers, - } + expectedQuery: &prompb.Query{ + StartTimestampMs: 1, + EndTimestampMs: 2, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_NEQ, Name: "a", Value: "something"}, + }, + }, + expectedSeries: []labels.Labels{ + labels.FromStrings("a", "b"), + labels.FromStrings("a", "b2", "region", "europe"), + labels.FromStrings("a", "b3", "region", "us"), + }, + }, + { + name: "external labels specified, not explicitly requested", + mint: 1, maxt: 2, + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "a", "something"), + }, + readRecent: true, + externalLabels: labels.Labels{ + {Name: "region", Value: "europe"}, + }, - have := q.Select(false, nil, test.matchers...) + expectedQuery: &prompb.Query{ + StartTimestampMs: 1, + EndTimestampMs: 2, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_NEQ, Name: "a", Value: "something"}, + {Type: prompb.LabelMatcher_EQ, Name: "region", Value: "europe"}, + }, + }, + expectedSeries: []labels.Labels{ + labels.FromStrings("a", "b"), + labels.FromStrings("a", "b2"), + }, + }, + { + name: "external labels specified, explicitly requested europe", + mint: 1, maxt: 2, + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "a", "something"), + labels.MustNewMatcher(labels.MatchEqual, "region", "europe"), + }, + readRecent: true, + externalLabels: labels.Labels{ + {Name: "region", Value: "europe"}, + }, - if want := test.seriesSet; want != have { - t.Errorf("%d. expected series set %+v, got %+v", i, want, have) - } - if want, have := test.requiredMatchers, q.requiredMatchers; !reflect.DeepEqual(want, have) { - t.Errorf("%d. requiredMatchersQuerier.Select() has modified the matchers", i) - } + expectedQuery: &prompb.Query{ + StartTimestampMs: 1, + EndTimestampMs: 2, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_NEQ, Name: "a", Value: "something"}, + {Type: prompb.LabelMatcher_EQ, Name: "region", Value: "europe"}, + }, + }, + expectedSeries: []labels.Labels{ + labels.FromStrings("a", "b"), + labels.FromStrings("a", "b2", "region", "europe"), + }, + }, + { + name: "external labels specified, explicitly requested not europe", + mint: 1, maxt: 2, + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "a", "something"), + labels.MustNewMatcher(labels.MatchEqual, "region", "us"), + }, + readRecent: true, + externalLabels: labels.Labels{ + {Name: "region", Value: "europe"}, + }, + + expectedQuery: &prompb.Query{ + StartTimestampMs: 1, + EndTimestampMs: 2, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_NEQ, Name: "a", Value: "something"}, + {Type: prompb.LabelMatcher_EQ, Name: "region", Value: "us"}, + }, + }, + expectedSeries: []labels.Labels{ + labels.FromStrings("a", "b"), + labels.FromStrings("a", "b3", "region", "us"), + }, + }, + { + name: "prefer local storage", + mint: 0, maxt: 50, + callback: func() (i int64, err error) { return 100, nil }, + readRecent: false, + + expectedQuery: &prompb.Query{ + StartTimestampMs: 0, + EndTimestampMs: 50, + Matchers: []*prompb.LabelMatcher{}, + }, + expectedSeries: []labels.Labels{ + labels.FromStrings("a", "b"), + labels.FromStrings("a", "b2", "region", "europe"), + labels.FromStrings("a", "b3", "region", "us"), + }, + }, + { + name: "prefer local storage, limited time", + mint: 0, maxt: 50, + callback: func() (i int64, err error) { return 20, nil }, + readRecent: false, + + expectedQuery: &prompb.Query{ + StartTimestampMs: 0, + EndTimestampMs: 20, + Matchers: []*prompb.LabelMatcher{}, + }, + expectedSeries: []labels.Labels{ + labels.FromStrings("a", "b"), + labels.FromStrings("a", "b2", "region", "europe"), + labels.FromStrings("a", "b3", "region", "us"), + }, + }, + { + name: "prefer local storage, skipped", + mint: 30, maxt: 50, + callback: func() (i int64, err error) { return 20, nil }, + readRecent: false, + + expectedQuery: nil, + expectedSeries: nil, // Noop should be used. + }, + { + name: "required matcher specified, user also specifies same", + mint: 1, maxt: 2, + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "a", "b2"), + }, + readRecent: true, + requiredMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "a", "b2"), + }, + + expectedQuery: &prompb.Query{ + StartTimestampMs: 1, + EndTimestampMs: 2, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_EQ, Name: "a", Value: "b2"}, + }, + }, + expectedSeries: []labels.Labels{ + labels.FromStrings("a", "b2", "region", "europe"), + }, + }, + { + name: "required matcher specified", + mint: 1, maxt: 2, + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "a", "b2"), + }, + readRecent: true, + requiredMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "a", "b2"), + }, + + expectedQuery: &prompb.Query{ + StartTimestampMs: 1, + EndTimestampMs: 2, + Matchers: []*prompb.LabelMatcher{ + {Type: prompb.LabelMatcher_EQ, Name: "a", Value: "b2"}, + }, + }, + expectedSeries: []labels.Labels{ + labels.FromStrings("a", "b2", "region", "europe"), + }, + }, + { + name: "required matcher specified, given matcher does not match", + mint: 1, maxt: 2, + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "a", "something"), + }, + readRecent: true, + requiredMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "a", "b2"), + }, + + expectedQuery: nil, + expectedSeries: nil, // Given matchers does not match with required ones, noop expected. + }, + { + name: "required matcher specified, given matcher does not match2", + mint: 1, maxt: 2, + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "x", "something"), + }, + readRecent: true, + requiredMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "a", "b2"), + }, + expectedQuery: nil, + expectedSeries: nil, // Given matchers does not match with required ones, noop expected. + }, + } { + t.Run(tc.name, func(t *testing.T) { + m.reset() + + c := NewSampleAndChunkQueryableClient( + m, + tc.externalLabels, + tc.requiredMatchers, + tc.readRecent, + tc.callback, + ) + q, err := c.Querier(context.TODO(), tc.mint, tc.maxt) + testutil.Ok(t, err) + defer testutil.Ok(t, q.Close()) + + ss := q.Select(true, nil, tc.matchers...) + testutil.Ok(t, err) + testutil.Equals(t, storage.Warnings(nil), ss.Warnings()) + + testutil.Equals(t, tc.expectedQuery, m.got) + + var got []labels.Labels + for ss.Next() { + got = append(got, ss.At().Labels()) + } + testutil.Ok(t, ss.Err()) + testutil.Equals(t, tc.expectedSeries, got) + + }) } } diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 9f2d8a58d..fd4a0529b 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -51,8 +51,8 @@ type Storage struct { rws *WriteStorage - // For reads - queryables []storage.Queryable + // For reads. + queryables []storage.SampleAndChunkQueryable localStartTimeCallback startTimeCallback } @@ -61,6 +61,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal if l == nil { l = log.NewNopLogger() } + s := &Storage{ logger: logging.Dedupe(l, 1*time.Minute), localStartTimeCallback: stCallback, @@ -80,7 +81,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { // Update read clients readHashes := make(map[string]struct{}) - queryables := make([]storage.Queryable, 0, len(conf.RemoteReadConfigs)) + queryables := make([]storage.SampleAndChunkQueryable, 0, len(conf.RemoteReadConfigs)) for _, rrConf := range conf.RemoteReadConfigs { hash, err := toHash(rrConf) if err != nil { @@ -96,12 +97,12 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { // Set the queue name to the config hash if the user has not set // a name in their remote write config so we can still differentiate // between queues that have the same remote write endpoint. - name := string(hash[:6]) + name := hash[:6] if rrConf.Name != "" { name = rrConf.Name } - c, err := NewClient(name, &ClientConfig{ + c, err := newReadClient(name, &ClientConfig{ URL: rrConf.URL, Timeout: rrConf.RemoteTimeout, HTTPClientConfig: rrConf.HTTPClientConfig, @@ -110,15 +111,13 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { return err } - q := QueryableClient(c) - q = ExternalLabelsHandler(q, conf.GlobalConfig.ExternalLabels) - if len(rrConf.RequiredMatchers) > 0 { - q = RequiredMatchersFilter(q, labelsToEqualityMatchers(rrConf.RequiredMatchers)) - } - if !rrConf.ReadRecent { - q = PreferLocalStorageFilter(q, s.localStartTimeCallback) - } - queryables = append(queryables, q) + queryables = append(queryables, NewSampleAndChunkQueryableClient( + c, + conf.GlobalConfig.ExternalLabels, + labelsToEqualityMatchers(rrConf.RequiredMatchers), + rrConf.ReadRecent, + s.localStartTimeCallback, + )) } s.queryables = queryables @@ -148,7 +147,25 @@ func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querie } queriers = append(queriers, q) } - return storage.NewMergeQuerier(nil, queriers, storage.ChainedSeriesMerge), nil + return storage.NewMergeQuerier(storage.NoopQuerier(), queriers, storage.ChainedSeriesMerge), nil +} + +// ChunkQuerier returns a storage.MergeQuerier combining the remote client queriers +// of each configured remote read endpoint. +func (s *Storage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { + s.mtx.Lock() + queryables := s.queryables + s.mtx.Unlock() + + queriers := make([]storage.ChunkQuerier, 0, len(queryables)) + for _, queryable := range queryables { + q, err := queryable.ChunkQuerier(ctx, mint, maxt) + if err != nil { + return nil, err + } + queriers = append(queriers, q) + } + return storage.NewMergeChunkQuerier(storage.NoopChunkedQuerier(), queriers, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil } // Appender implements storage.Storage. diff --git a/storage/remote/write.go b/storage/remote/write.go index c71239736..28ea8aef1 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -113,12 +113,12 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { // Set the queue name to the config hash if the user has not set // a name in their remote write config so we can still differentiate // between queues that have the same remote write endpoint. - name := string(hash[:6]) + name := hash[:6] if rwConf.Name != "" { name = rwConf.Name } - c, err := NewClient(name, &ClientConfig{ + c, err := NewWriteClient(name, &ClientConfig{ URL: rwConf.URL, Timeout: rwConf.RemoteTimeout, HTTPClientConfig: rwConf.HTTPClientConfig, diff --git a/storage/series.go b/storage/series.go index 58b5d98b3..237d2e376 100644 --- a/storage/series.go +++ b/storage/series.go @@ -14,6 +14,7 @@ package storage import ( + "math" "sort" "github.com/prometheus/prometheus/pkg/labels" @@ -23,23 +24,34 @@ import ( ) type listSeriesIterator struct { - samples []tsdbutil.Sample + samples Samples idx int } +type samples []tsdbutil.Sample + +func (s samples) Get(i int) tsdbutil.Sample { return s[i] } +func (s samples) Len() int { return len(s) } + +// Samples interface allows to work on arrays of types that are compatible with tsdbutil.Sample. +type Samples interface { + Get(i int) tsdbutil.Sample + Len() int +} + // NewListSeriesIterator returns listSeriesIterator that allows to iterate over provided samples. Does not handle overlaps. -func NewListSeriesIterator(samples []tsdbutil.Sample) chunkenc.Iterator { +func NewListSeriesIterator(samples Samples) chunkenc.Iterator { return &listSeriesIterator{samples: samples, idx: -1} } func (it *listSeriesIterator) At() (int64, float64) { - s := it.samples[it.idx] + s := it.samples.Get(it.idx) return s.T(), s.V() } func (it *listSeriesIterator) Next() bool { it.idx++ - return it.idx < len(it.samples) + return it.idx < it.samples.Len() } func (it *listSeriesIterator) Seek(t int64) bool { @@ -47,12 +59,12 @@ func (it *listSeriesIterator) Seek(t int64) bool { it.idx = 0 } // Do binary search between current position and end. - it.idx = sort.Search(len(it.samples)-it.idx, func(i int) bool { - s := it.samples[i+it.idx] + it.idx = sort.Search(it.samples.Len()-it.idx, func(i int) bool { + s := it.samples.Get(i + it.idx) return s.T() >= t }) - return it.idx < len(it.samples) + return it.idx < it.samples.Len() } func (it *listSeriesIterator) Err() error { return nil } @@ -84,7 +96,6 @@ type chunkSetToSeriesSet struct { chkIterErr error sameSeriesChunks []Series - bufIterator chunkenc.Iterator } // NewSeriesSetFromChunkSeriesSet converts ChunkSeriesSet to SeriesSet by decoding chunks one by one. @@ -101,10 +112,9 @@ func (c *chunkSetToSeriesSet) Next() bool { c.sameSeriesChunks = c.sameSeriesChunks[:0] for iter.Next() { - c.sameSeriesChunks = append(c.sameSeriesChunks, &chunkToSeries{ + c.sameSeriesChunks = append(c.sameSeriesChunks, &chunkToSeriesDecoder{ labels: c.ChunkSeriesSet.At().Labels(), - chk: iter.At(), - buf: c.bufIterator, + Meta: iter.At(), }) } @@ -128,11 +138,82 @@ func (c *chunkSetToSeriesSet) Err() error { return c.ChunkSeriesSet.Err() } -type chunkToSeries struct { +type chunkToSeriesDecoder struct { + chunks.Meta + labels labels.Labels - chk chunks.Meta - buf chunkenc.Iterator } -func (s *chunkToSeries) Labels() labels.Labels { return s.labels } -func (s *chunkToSeries) Iterator() chunkenc.Iterator { return s.chk.Chunk.Iterator(s.buf) } +func (s *chunkToSeriesDecoder) Labels() labels.Labels { return s.labels } + +// TODO(bwplotka): Can we provide any chunkenc buffer? +func (s *chunkToSeriesDecoder) Iterator() chunkenc.Iterator { return s.Chunk.Iterator(nil) } + +type seriesSetToChunkSet struct { + SeriesSet +} + +// NewSeriesSetToChunkSet converts SeriesSet to ChunkSeriesSet by encoding chunks from samples. +func NewSeriesSetToChunkSet(chk SeriesSet) ChunkSeriesSet { + return &seriesSetToChunkSet{SeriesSet: chk} +} + +func (c *seriesSetToChunkSet) Next() bool { + if c.Err() != nil || !c.SeriesSet.Next() { + return false + } + return true +} + +func (c *seriesSetToChunkSet) At() ChunkSeries { + return &seriesToChunkEncoder{ + Series: c.SeriesSet.At(), + } +} + +func (c *seriesSetToChunkSet) Err() error { + return c.SeriesSet.Err() +} + +type seriesToChunkEncoder struct { + Series +} + +// TODO(bwplotka): Currently encoder will just naively build one chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670 +func (s *seriesToChunkEncoder) Iterator() chunks.Iterator { + chk := chunkenc.NewXORChunk() + app, err := chk.Appender() + if err != nil { + return errChunksIterator{err: err} + } + mint := int64(math.MaxInt64) + maxt := int64(math.MinInt64) + + seriesIter := s.Series.Iterator() + for seriesIter.Next() { + t, v := seriesIter.At() + app.Append(t, v) + + maxt = t + if mint == math.MaxInt64 { + mint = t + } + } + if err := seriesIter.Err(); err != nil { + return errChunksIterator{err: err} + } + + return NewListChunkSeriesIterator(chunks.Meta{ + MinTime: mint, + MaxTime: maxt, + Chunk: chk, + }) +} + +type errChunksIterator struct { + err error +} + +func (e errChunksIterator) At() chunks.Meta { return chunks.Meta{} } +func (e errChunksIterator) Next() bool { return false } +func (e errChunksIterator) Err() error { return e.err } diff --git a/storage/series_test.go b/storage/series_test.go index 197e0f58a..09fd95141 100644 --- a/storage/series_test.go +++ b/storage/series_test.go @@ -27,11 +27,11 @@ type MockSeries struct { SampleIteratorFn func() chunkenc.Iterator } -func NewListSeries(lset labels.Labels, samples []tsdbutil.Sample) *MockSeries { +func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *MockSeries { return &MockSeries{ labels: lset, SampleIteratorFn: func() chunkenc.Iterator { - return NewListSeriesIterator(samples) + return NewListSeriesIterator(samples(s)) }, } } diff --git a/tsdb/db.go b/tsdb/db.go index a2892b6d9..8c8b049d5 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -39,11 +39,11 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/tsdb/wal" + "golang.org/x/sync/errgroup" // Load the package into main to make sure minium Go version is met. _ "github.com/prometheus/prometheus/tsdb/goversion" - "github.com/prometheus/prometheus/tsdb/wal" - "golang.org/x/sync/errgroup" ) const ( @@ -420,6 +420,11 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu return dbWritable.Querier(ctx, mint, maxt) } +func (db *DBReadOnly) ChunkQuerier(context.Context, int64, int64) (storage.ChunkQuerier, error) { + // TODO(bwplotka): Implement in next PR. + return nil, errors.New("not implemented") +} + // Blocks returns a slice of block readers for persisted blocks. func (db *DBReadOnly) Blocks() ([]BlockReader, error) { select { @@ -1345,6 +1350,11 @@ func (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, err }, nil } +func (db *DB) ChunkQuerier(context.Context, int64, int64) (storage.ChunkQuerier, error) { + // TODO(bwplotka): Implement in next PR. + return nil, errors.New("not implemented") +} + func rangeForTimestamp(t int64, width int64) (maxt int64) { return (t/width)*width + width } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index df6d2e658..33cb5a9a3 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -172,6 +172,7 @@ type TSDBAdminStats interface { // API can register a set of endpoints in a router and handle // them using the provided storage and query engine. type API struct { + // TODO(bwplotka): Change to SampleAndChunkQueryable in next PR. Queryable storage.Queryable QueryEngine *promql.Engine @@ -204,7 +205,7 @@ func init() { // NewAPI returns an initialized API type. func NewAPI( qe *promql.Engine, - q storage.Queryable, + q storage.SampleAndChunkQueryable, tr func(context.Context) TargetRetriever, ar func(context.Context) AlertmanagerRetriever, configFunc func() config.Config, @@ -1199,8 +1200,8 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { sortedExternalLabels := make([]prompb.Label, 0, len(externalLabels)) for name, value := range externalLabels { sortedExternalLabels = append(sortedExternalLabels, prompb.Label{ - Name: string(name), - Value: string(value), + Name: name, + Value: value, }) } sort.Slice(sortedExternalLabels, func(i, j int) bool { @@ -1215,78 +1216,142 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { switch responseType { case prompb.ReadRequest_STREAMED_XOR_CHUNKS: - w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") + api.remoteReadStreamedXORChunks(ctx, w, req, externalLabels, sortedExternalLabels) + default: + api.remoteReadSamples(ctx, w, req, externalLabels, sortedExternalLabels) + } +} - f, ok := w.(http.Flusher) - if !ok { - http.Error(w, "internal http.ResponseWriter does not implement http.Flusher interface", http.StatusInternalServerError) +func (api *API) remoteReadSamples(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest, externalLabels map[string]string, sortedExternalLabels []prompb.Label) { + w.Header().Set("Content-Type", "application/x-protobuf") + w.Header().Set("Content-Encoding", "snappy") + + // On empty or unknown types in req.AcceptedResponseTypes we default to non streamed, raw samples response. + resp := prompb.ReadResponse{ + Results: make([]*prompb.QueryResult, len(req.Queries)), + } + for i, query := range req.Queries { + if err := func() error { + filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) + if err != nil { + return err + } + + querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs) + if err != nil { + return err + } + defer func() { + if err := querier.Close(); err != nil { + level.Warn(api.logger).Log("msg", "Error on querier close", "err", err.Error()) + } + }() + + var hints *storage.SelectHints + if query.Hints != nil { + hints = &storage.SelectHints{ + Start: query.Hints.StartMs, + End: query.Hints.EndMs, + Step: query.Hints.StepMs, + Func: query.Hints.Func, + Grouping: query.Hints.Grouping, + Range: query.Hints.RangeMs, + By: query.Hints.By, + } + } + + var ws storage.Warnings + resp.Results[i], ws, err = remote.ToQueryResult(querier.Select(false, hints, filteredMatchers...), api.remoteReadSampleLimit) + if err != nil { + return err + } + + for _, w := range ws { + level.Warn(api.logger).Log("msg", "Warnings on remote read query", "err", w.Error()) + } + + for _, ts := range resp.Results[i].Timeseries { + ts.Labels = remote.MergeLabels(ts.Labels, sortedExternalLabels) + } + + return nil + }(); err != nil { + if httpErr, ok := err.(remote.HTTPError); ok { + http.Error(w, httpErr.Error(), httpErr.Status()) + return + } + http.Error(w, err.Error(), http.StatusInternalServerError) return } - for i, query := range req.Queries { - ws, err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) (storage.Warnings, error) { + } + + if err := remote.EncodeReadResponse(&resp, w); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (api *API) remoteReadStreamedXORChunks(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest, externalLabels map[string]string, sortedExternalLabels []prompb.Label) { + w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") + + f, ok := w.(http.Flusher) + if !ok { + http.Error(w, "internal http.ResponseWriter does not implement http.Flusher interface", http.StatusInternalServerError) + return + } + + for i, query := range req.Queries { + if err := func() error { + filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) + if err != nil { + return err + } + + // TODO(bwplotka): Use ChunkQuerier once ready in tsdb package. + querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs) + if err != nil { + return err + } + defer func() { + if err := querier.Close(); err != nil { + level.Warn(api.logger).Log("msg", "Error on chunk querier close", "warnings", err.Error()) + } + }() + + var hints *storage.SelectHints + if query.Hints != nil { + hints = &storage.SelectHints{ + Start: query.Hints.StartMs, + End: query.Hints.EndMs, + Step: query.Hints.StepMs, + Func: query.Hints.Func, + Grouping: query.Hints.Grouping, + Range: query.Hints.RangeMs, + By: query.Hints.By, + } + } + + ws, err := remote.DeprecatedStreamChunkedReadResponses( + remote.NewChunkedWriter(w, f), + int64(i), // The streaming API has to provide the series sorted. - set := querier.Select(true, hints, filteredMatchers...) - - return remote.StreamChunkedReadResponses( - remote.NewChunkedWriter(w, f), - int64(i), - set, - sortedExternalLabels, - api.remoteReadMaxBytesInFrame, - ) - }) - for _, w := range ws { - level.Warn(api.logger).Log("msg", "warnings on remote read query", "err", w.Error()) - } + querier.Select(true, hints, filteredMatchers...), + sortedExternalLabels, + api.remoteReadMaxBytesInFrame, + ) if err != nil { - if httpErr, ok := err.(remote.HTTPError); ok { - http.Error(w, httpErr.Error(), httpErr.Status()) - return - } - http.Error(w, err.Error(), http.StatusInternalServerError) + return err + } + + for _, w := range ws { + level.Warn(api.logger).Log("msg", "Warnings on chunked remote read query", "warnings", w.Error()) + } + return nil + }(); err != nil { + if httpErr, ok := err.(remote.HTTPError); ok { + http.Error(w, httpErr.Error(), httpErr.Status()) return } - } - default: - w.Header().Set("Content-Type", "application/x-protobuf") - w.Header().Set("Content-Encoding", "snappy") - - // On empty or unknown types in req.AcceptedResponseTypes we default to non streamed, raw samples response. - resp := prompb.ReadResponse{ - Results: make([]*prompb.QueryResult, len(req.Queries)), - } - for i, query := range req.Queries { - ws, err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) (storage.Warnings, error) { - set := querier.Select(false, hints, filteredMatchers...) - - var ( - ws storage.Warnings - err error - ) - resp.Results[i], ws, err = remote.ToQueryResult(set, api.remoteReadSampleLimit) - if err != nil { - return ws, err - } - - for _, ts := range resp.Results[i].Timeseries { - ts.Labels = remote.MergeLabels(ts.Labels, sortedExternalLabels) - } - return ws, nil - }) - for _, w := range ws { - level.Warn(api.logger).Log("msg", "warnings on remote read query", "err", w.Error()) - } - if err != nil { - if httpErr, ok := err.(remote.HTTPError); ok { - http.Error(w, httpErr.Error(), httpErr.Status()) - return - } - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - } - - if err := remote.EncodeReadResponse(&resp, w); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -1319,37 +1384,6 @@ func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabe return filteredMatchers, nil } -func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) (storage.Warnings, error)) (storage.Warnings, error) { - filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) - if err != nil { - return nil, err - } - - querier, err := api.Queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs) - if err != nil { - return nil, err - } - defer func() { - if err := querier.Close(); err != nil { - level.Warn(api.logger).Log("msg", "Error on querier close", "err", err.Error()) - } - }() - - var hints *storage.SelectHints - if query.Hints != nil { - hints = &storage.SelectHints{ - Start: query.Hints.StartMs, - End: query.Hints.EndMs, - Step: query.Hints.StepMs, - Func: query.Hints.Func, - Grouping: query.Hints.Grouping, - Range: query.Hints.RangeMs, - By: query.Hints.By, - } - } - return seriesHandleFn(querier, hints, filteredMatchers) -} - func (api *API) deleteSeries(r *http.Request) apiFuncResult { if !api.enableAdmin { return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil} diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 54b8b633c..18861f6df 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -360,9 +360,7 @@ func TestEndpoints(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dbDir) - remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) { - return 0, nil - }, dbDir, 1*time.Second) + remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, nil, dbDir, 1*time.Second) err = remote.ApplyConfig(&config.Config{ RemoteReadConfigs: []*config.RemoteReadConfig{