diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 203eafb07..c22a2cd95 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1164,7 +1164,7 @@ func (n notReadyAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar return 0, tsdb.ErrNotReady } -func (n notReadyAppender) AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) { +func (n notReadyAppender) AppendHistogram(ref uint64, l labels.Labels, t int64, sh histogram.SparseHistogram) (uint64, error) { return 0, tsdb.ErrNotReady } diff --git a/promql/value.go b/promql/value.go index fa3a71d35..cddaf7435 100644 --- a/promql/value.go +++ b/promql/value.go @@ -21,6 +21,7 @@ import ( "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" @@ -295,6 +296,10 @@ func (ssi *storageSeriesIterator) At() (t int64, v float64) { return p.T, p.V } +func (ssi *storageSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) { + return 0, histogram.SparseHistogram{} +} + func (ssi *storageSeriesIterator) Next() bool { ssi.curr++ return ssi.curr < len(ssi.points) diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index f49695877..1f2e065f1 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -35,7 +35,7 @@ func (a nopAppender) Append(uint64, labels.Labels, int64, float64) (uint64, erro func (a nopAppender) AppendExemplar(uint64, labels.Labels, exemplar.Exemplar) (uint64, error) { return 0, nil } -func (a nopAppender) AppendHistogram(uint64, labels.Labels, histogram.SparseHistogram) (uint64, error) { +func (a nopAppender) AppendHistogram(uint64, labels.Labels, int64, histogram.SparseHistogram) (uint64, error) { return 0, nil } func (a nopAppender) Commit() error { return nil } @@ -47,6 +47,11 @@ type sample struct { v float64 } +type hist struct { + h histogram.SparseHistogram + t int64 +} + // collectResultAppender records all samples that were added through the appender. // It can be used as its zero value or be backed by another appender it writes samples through. type collectResultAppender struct { @@ -56,9 +61,9 @@ type collectResultAppender struct { rolledbackResult []sample pendingExemplars []exemplar.Exemplar resultExemplars []exemplar.Exemplar - resultHistograms []histogram.SparseHistogram - pendingHistograms []histogram.SparseHistogram - rolledbackHistograms []histogram.SparseHistogram + resultHistograms []hist + pendingHistograms []hist + rolledbackHistograms []hist } func (a *collectResultAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) { @@ -91,13 +96,13 @@ func (a *collectResultAppender) AppendExemplar(ref uint64, l labels.Labels, e ex return a.next.AppendExemplar(ref, l, e) } -func (a *collectResultAppender) AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) { - a.pendingHistograms = append(a.pendingHistograms, sh) +func (a *collectResultAppender) AppendHistogram(ref uint64, l labels.Labels, t int64, sh histogram.SparseHistogram) (uint64, error) { + a.pendingHistograms = append(a.pendingHistograms, hist{h: sh, t: t}) if a.next == nil { return 0, nil } - return a.next.AppendHistogram(ref, l, sh) + return a.next.AppendHistogram(ref, l, t, sh) } func (a *collectResultAppender) Commit() error { diff --git a/storage/buffer.go b/storage/buffer.go index feca1d91e..b6196a0c7 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -16,6 +16,7 @@ package storage import ( "math" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/tsdb/chunkenc" ) @@ -197,6 +198,10 @@ func (it *sampleRingIterator) At() (int64, float64) { return it.r.at(it.i) } +func (it *sampleRingIterator) AtHistogram() (int64, histogram.SparseHistogram) { + return 0, histogram.SparseHistogram{} +} + func (r *sampleRing) at(i int) (int64, float64) { j := (r.f + i) % len(r.buf) s := r.buf[j] diff --git a/storage/buffer_test.go b/storage/buffer_test.go index b67af6de9..2e0abfd0c 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -17,6 +17,7 @@ import ( "math/rand" "testing" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/stretchr/testify/require" ) @@ -194,8 +195,11 @@ type mockSeriesIterator struct { func (m *mockSeriesIterator) Seek(t int64) bool { return m.seek(t) } func (m *mockSeriesIterator) At() (int64, float64) { return m.at() } -func (m *mockSeriesIterator) Next() bool { return m.next() } -func (m *mockSeriesIterator) Err() error { return m.err() } +func (m *mockSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) { + return 0, histogram.SparseHistogram{} +} +func (m *mockSeriesIterator) Next() bool { return m.next() } +func (m *mockSeriesIterator) Err() error { return m.err() } type fakeSeriesIterator struct { nsamples int64 @@ -211,6 +215,10 @@ func (it *fakeSeriesIterator) At() (int64, float64) { return it.idx * it.step, 123 // value doesn't matter } +func (it *fakeSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) { + return it.idx * it.step, histogram.SparseHistogram{} // value doesn't matter +} + func (it *fakeSeriesIterator) Next() bool { it.idx++ return it.idx < it.nsamples diff --git a/storage/fanout.go b/storage/fanout.go index df323a316..754544d35 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -173,14 +173,14 @@ func (f *fanoutAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar. return ref, nil } -func (f *fanoutAppender) AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) { - ref, err := f.primary.AppendHistogram(ref, l, sh) +func (f *fanoutAppender) AppendHistogram(ref uint64, l labels.Labels, t int64, sh histogram.SparseHistogram) (uint64, error) { + ref, err := f.primary.AppendHistogram(ref, l, t, sh) if err != nil { return ref, err } for _, appender := range f.secondaries { - if _, err := appender.AppendHistogram(ref, l, sh); err != nil { + if _, err := appender.AppendHistogram(ref, l, t, sh); err != nil { return 0, err } } diff --git a/storage/interface.go b/storage/interface.go index a435c6060..f90ded1b1 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -221,7 +221,7 @@ type HistogramAppender interface { // to Append() at any point. Adding the sample via Append() returns a new // reference number. // If the reference is 0 it must not be used for caching. - AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) + AppendHistogram(ref uint64, l labels.Labels, t int64, sh histogram.SparseHistogram) (uint64, error) } // SeriesSet contains a set of series. diff --git a/storage/merge.go b/storage/merge.go index 81e45d55d..ac43d11c0 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -481,6 +482,13 @@ func (c *chainSampleIterator) At() (t int64, v float64) { return c.curr.At() } +func (c *chainSampleIterator) AtHistogram() (int64, histogram.SparseHistogram) { + if c.curr == nil { + panic("chainSampleIterator.AtHistogram() called before first .Next() or after .Next() returned false.") + } + return c.curr.AtHistogram() +} + func (c *chainSampleIterator) Next() bool { if c.h == nil { c.h = samplesIteratorHeap{} diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 0bd1b9762..388d3090c 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -26,6 +26,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/prompb" @@ -368,6 +369,10 @@ func (c *concreteSeriesIterator) At() (t int64, v float64) { return s.Timestamp, s.Value } +func (c *concreteSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) { + return 0, histogram.SparseHistogram{} +} + // Next implements storage.SeriesIterator. func (c *concreteSeriesIterator) Next() bool { c.cur++ diff --git a/storage/remote/write.go b/storage/remote/write.go index f17505852..7d8f57d96 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -238,7 +238,7 @@ func (t *timestampTracker) AppendExemplar(_ uint64, _ labels.Labels, _ exemplar. return 0, nil } -func (t *timestampTracker) AppendHistogram(_ uint64, _ labels.Labels, _ histogram.SparseHistogram) (uint64, error) { +func (t *timestampTracker) AppendHistogram(_ uint64, _ labels.Labels, _ int64, _ histogram.SparseHistogram) (uint64, error) { return 0, errors.New("not implemented") } diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index bc8581b01..d7698ecdb 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -140,7 +140,7 @@ func (*mockAppendable) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Ex return 0, nil } -func (*mockAppendable) AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) { +func (*mockAppendable) AppendHistogram(ref uint64, l labels.Labels, t int64, sh histogram.SparseHistogram) (uint64, error) { // noop until we implement sparse histograms over remote write return 0, nil } diff --git a/storage/series.go b/storage/series.go index d92b4fd18..86471a143 100644 --- a/storage/series.go +++ b/storage/series.go @@ -17,6 +17,7 @@ import ( "math" "sort" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -90,6 +91,10 @@ func (it *listSeriesIterator) At() (int64, float64) { return s.T(), s.V() } +func (it *listSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) { + return 0, histogram.SparseHistogram{} +} + func (it *listSeriesIterator) Next() bool { it.idx++ return it.idx < it.samples.Len() diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 63d3a6111..d28e34812 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -36,6 +36,14 @@ func (e Encoding) String() string { return "" } +func IsValidEncoding(e Encoding) bool { + switch e { + case EncXOR, EncSHS: + return true + } + return false +} + // The different available chunk encodings. const ( EncNone Encoding = iota @@ -89,6 +97,9 @@ type Iterator interface { // At returns the current timestamp/value pair. // Before the iterator has advanced At behaviour is unspecified. At() (int64, float64) + // AtHistogram returns the current timestamp/histogram pair. + // Before the iterator has advanced AtHistogram behaviour is unspecified. + AtHistogram() (int64, histogram.SparseHistogram) // Err returns the current error. It should be used only after iterator is // exhausted, that is `Next` or `Seek` returns false. Err() error @@ -103,8 +114,11 @@ type nopIterator struct{} func (nopIterator) Seek(int64) bool { return false } func (nopIterator) At() (int64, float64) { return math.MinInt64, 0 } -func (nopIterator) Next() bool { return false } -func (nopIterator) Err() error { return nil } +func (nopIterator) AtHistogram() (int64, histogram.SparseHistogram) { + return math.MinInt64, histogram.SparseHistogram{} +} +func (nopIterator) Next() bool { return false } +func (nopIterator) Err() error { return nil } // Pool is used to create and reuse chunk references to avoid allocations. type Pool interface { @@ -115,6 +129,7 @@ type Pool interface { // pool is a memory pool of chunk objects. type pool struct { xor sync.Pool + shs sync.Pool } // NewPool returns a new pool. @@ -125,6 +140,11 @@ func NewPool() Pool { return &XORChunk{b: bstream{}} }, }, + shs: sync.Pool{ + New: func() interface{} { + return &HistoChunk{b: bstream{}} + }, + }, } } @@ -135,6 +155,12 @@ func (p *pool) Get(e Encoding, b []byte) (Chunk, error) { c.b.stream = b c.b.count = 0 return c, nil + case EncSHS: + // TODO: update metadata + c := p.shs.Get().(*HistoChunk) + c.b.stream = b + c.b.count = 0 + return c, nil } return nil, errors.Errorf("invalid chunk encoding %q", e) } @@ -152,6 +178,18 @@ func (p *pool) Put(c Chunk) error { xc.b.stream = nil xc.b.count = 0 p.xor.Put(c) + case EncSHS: + // TODO: update metadata + sh, ok := c.(*HistoChunk) + // This may happen often with wrapped chunks. Nothing we can really do about + // it but returning an error would cause a lot of allocations again. Thus, + // we just skip it. + if !ok { + return nil + } + sh.b.stream = nil + sh.b.count = 0 + p.shs.Put(c) default: return errors.Errorf("invalid chunk encoding %q", c.Encoding()) } @@ -165,6 +203,20 @@ func FromData(e Encoding, d []byte) (Chunk, error) { switch e { case EncXOR: return &XORChunk{b: bstream{count: 0, stream: d}}, nil + case EncSHS: + // TODO: update metadata + return &HistoChunk{b: bstream{count: 0, stream: d}}, nil + } + return nil, errors.Errorf("invalid chunk encoding %q", e) +} + +// NewEmptyChunk returns an empty chunk for the given encoding. +func NewEmptyChunk(e Encoding) (Chunk, error) { + switch e { + case EncXOR: + return NewXORChunk(), nil + case EncSHS: + return NewHistoChunk(), nil } return nil, errors.Errorf("invalid chunk encoding %q", e) } diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index 08f2070e6..a043f4caf 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -191,10 +191,9 @@ func (c *HistoChunk) iterator(it Iterator) *histoIterator { } // Iterator implements the Chunk interface. -// TODO return interface type? -//func (c *HistoChunk) Iterator(it Iterator) *histoIterator { -// return c.iterator(it) -//} +func (c *HistoChunk) Iterator(it Iterator) Iterator { + return c.iterator(it) +} type histoAppender struct { c *HistoChunk // this is such that during the first append we can set the metadata on the chunk. not sure if that's how it should work @@ -422,7 +421,12 @@ func (it *histoIterator) Seek(t int64) bool { } return true } -func (it *histoIterator) At() (t int64, h histogram.SparseHistogram) { + +func (it *histoIterator) At() (int64, float64) { + panic("cannot call histoIterator.At().") +} + +func (it *histoIterator) AtHistogram() (int64, histogram.SparseHistogram) { return it.t, histogram.SparseHistogram{ Count: it.cnt, ZeroCount: it.zcnt, diff --git a/tsdb/chunkenc/histo_test.go b/tsdb/chunkenc/histo_test.go index 907756a68..de534b0b6 100644 --- a/tsdb/chunkenc/histo_test.go +++ b/tsdb/chunkenc/histo_test.go @@ -76,7 +76,7 @@ func TestHistoChunkSameBuckets(t *testing.T) { require.NoError(t, it1.Err()) var res1 []res for it1.Next() { - ts, h := it1.At() + ts, h := it1.AtHistogram() res1 = append(res1, res{t: ts, h: h}) } require.NoError(t, it1.Err()) diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index d1dc09f8f..374959221 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -277,6 +277,10 @@ func (it *xorIterator) At() (int64, float64) { return it.t, it.val } +func (it *xorIterator) AtHistogram() (int64, histogram.SparseHistogram) { + panic("cannot call xorIterator.AtHistogram().") +} + func (it *xorIterator) Err() error { return it.err } diff --git a/tsdb/head.go b/tsdb/head.go index 42e4f86f0..e112c9113 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1195,14 +1195,14 @@ func (a *initAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Ex return a.app.AppendExemplar(ref, l, e) } -func (a *initAppender) AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) { +func (a *initAppender) AppendHistogram(ref uint64, l labels.Labels, t int64, sh histogram.SparseHistogram) (uint64, error) { if a.app != nil { - return a.app.AppendHistogram(ref, l, sh) + return a.app.AppendHistogram(ref, l, t, sh) } - //a.head.initTime(sh.Ts) FIXME(ganesh) + a.head.initTime(t) a.app = a.head.appender() - return a.app.AppendHistogram(ref, l, sh) + return a.app.AppendHistogram(ref, l, t, sh) } var _ storage.GetRef = &initAppender{} @@ -1359,10 +1359,12 @@ type headAppender struct { mint, maxt int64 exemplarAppender ExemplarStorage - series []record.RefSeries - samples []record.RefSample - exemplars []exemplarWithSeriesRef - sampleSeries []*memSeries + series []record.RefSeries + samples []record.RefSample + exemplars []exemplarWithSeriesRef + sampleSeries []*memSeries + histograms []record.RefHistogram + histogramSeries []*memSeries appendID, cleanupAppendIDsBelow uint64 closed bool @@ -1457,9 +1459,63 @@ func (a *headAppender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Ex return s.ref, nil } -func (a *headAppender) AppendHistogram(ref uint64, _ labels.Labels, sh histogram.SparseHistogram) (uint64, error) { - // TODO. - return 0, nil +func (a *headAppender) AppendHistogram(ref uint64, lset labels.Labels, t int64, sh histogram.SparseHistogram) (uint64, error) { + if t < a.minValidTime { + a.head.metrics.outOfBoundSamples.Inc() + return 0, storage.ErrOutOfBounds + } + + s := a.head.series.getByID(ref) + if s == nil { + // Ensure no empty labels have gotten through. + lset = lset.WithoutEmpty() + if len(lset) == 0 { + return 0, errors.Wrap(ErrInvalidSample, "empty labelset") + } + + if l, dup := lset.HasDuplicateLabelNames(); dup { + return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l)) + } + + var created bool + var err error + s, created, err = a.head.getOrCreate(lset.Hash(), lset) + if err != nil { + return 0, err + } + if created { + a.series = append(a.series, record.RefSeries{ + Ref: s.ref, + Labels: lset, + }) + } + } + + s.Lock() + if err := s.appendableHistogram(t, sh); err != nil { + s.Unlock() + if err == storage.ErrOutOfOrderSample { + a.head.metrics.outOfOrderSamples.Inc() + } + return 0, err + } + s.pendingCommit = true + s.Unlock() + + if t < a.mint { + a.mint = t + } + if t > a.maxt { + a.maxt = t + } + + a.histograms = append(a.histograms, record.RefHistogram{ + Ref: s.ref, + T: t, + H: sh, + }) + a.histogramSeries = append(a.histogramSeries, s) + return s.ref, nil } var _ storage.GetRef = &headAppender{} @@ -1572,6 +1628,24 @@ func (a *headAppender) Commit() (err error) { a.head.metrics.chunksCreated.Inc() } } + total += len(a.histograms) // TODO: different metric? + for i, s := range a.histograms { + series = a.histogramSeries[i] + series.Lock() + ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, a.head.chunkDiskMapper) + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) + series.pendingCommit = false + series.Unlock() + + if !ok { + total-- + a.head.metrics.outOfOrderSamples.Inc() + } + if chunkCreated { + a.head.metrics.chunks.Inc() + a.head.metrics.chunksCreated.Inc() + } + } a.head.metrics.samplesAppended.Add(float64(total)) a.head.updateMinMaxTime(a.mint, a.maxt) @@ -2347,15 +2421,24 @@ func (s *memSeries) maxTime() int64 { return c.maxTime } -func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk { +func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk { s.mmapCurrentHeadChunk(chunkDiskMapper) s.headChunk = &memChunk{ - chunk: chunkenc.NewXORChunk(), minTime: mint, maxTime: math.MinInt64, } + if chunkenc.IsValidEncoding(e) { + var err error + s.headChunk.chunk, err = chunkenc.NewEmptyChunk(e) + if err != nil { + panic(err) // This should never happen. + } + } else { + s.headChunk.chunk = chunkenc.NewXORChunk() + } + // Set upper bound on when the next chunk must be started. An earlier timestamp // may be chosen dynamically at a later point. s.nextAt = rangeForTimestamp(mint, s.chunkRange) @@ -2409,6 +2492,28 @@ func (s *memSeries) appendable(t int64, v float64) error { return nil } +// appendableHistogram checks whether the given sample is valid for appending to the series. +func (s *memSeries) appendableHistogram(t int64, sh histogram.SparseHistogram) error { + c := s.head() + if c == nil { + return nil + } + + if t > c.maxTime { + return nil + } + if t < c.maxTime { + return storage.ErrOutOfOrderSample + } + // TODO: do it for histogram. + // We are allowing exact duplicates as we can encounter them in valid cases + // like federation and erroring out at that time would be extremely noisy. + //if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) { + // return storage.ErrDuplicateSampleForTimestamp + //} + return nil +} + // chunk returns the chunk for the chunk id from memory or by m-mapping it from the disk. // If garbageCollect is true, it means that the returned *memChunk // (and not the chunkenc.Chunk inside it) can be garbage collected after it's usage. @@ -2475,38 +2580,11 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { // isolation for this append.) // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { - // Based on Gorilla white papers this offers near-optimal compression ratio - // so anything bigger that this has diminishing returns and increases - // the time range within which we have to decompress all samples. - const samplesPerChunk = 120 + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, chunkDiskMapper) + if !sampleInOrder { + return sampleInOrder, chunkCreated + } - c := s.head() - - if c == nil { - if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t { - // Out of order sample. Sample timestamp is already in the mmaped chunks, so ignore it. - return false, false - } - // There is no chunk in this series yet, create the first chunk for the sample. - c = s.cutNewHeadChunk(t, chunkDiskMapper) - chunkCreated = true - } - numSamples := c.chunk.NumSamples() - - // Out of order sample. - if c.maxTime >= t { - return false, chunkCreated - } - // If we reach 25% of a chunk's desired sample count, set a definitive time - // at which to start the next chunk. - // At latest it must happen at the timestamp set when the chunk was cut. - if numSamples == samplesPerChunk/4 { - s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) - } - if t >= s.nextAt { - c = s.cutNewHeadChunk(t, chunkDiskMapper) - chunkCreated = true - } s.app.Append(t, v) c.maxTime = t @@ -2523,6 +2601,64 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper return true, chunkCreated } +// appendHistogram adds the sparse histogram. +// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. +func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncSHS, chunkDiskMapper) + if !sampleInOrder { + return sampleInOrder, chunkCreated + } + + s.app.AppendHistogram(t, sh) + + c.maxTime = t + + if appendID > 0 { + s.txs.add(appendID) + } + + return true, chunkCreated +} + +// appendPreprocessor takes care of cutting new chunks and m-mapping old chunks. +// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. +// This should be called only when appending data. +func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper) (c *memChunk, sampleInOrder, chunkCreated bool) { + // Based on Gorilla white papers this offers near-optimal compression ratio + // so anything bigger that this has diminishing returns and increases + // the time range within which we have to decompress all samples. + const samplesPerChunk = 120 + + c = s.head() + + if c == nil { + if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t { + // Out of order sample. Sample timestamp is already in the mmaped chunks, so ignore it. + return c, false, false + } + // There is no chunk in this series yet, create the first chunk for the sample. + c = s.cutNewHeadChunk(t, e, chunkDiskMapper) + chunkCreated = true + } + numSamples := c.chunk.NumSamples() + + // Out of order sample. + if c.maxTime >= t { + return c, false, chunkCreated + } + // If we reach 25% of a chunk's desired sample count, set a definitive time + // at which to start the next chunk. + // At latest it must happen at the timestamp set when the chunk was cut. + if numSamples == samplesPerChunk/4 { + s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) + } + if t >= s.nextAt { + c = s.cutNewHeadChunk(t, e, chunkDiskMapper) + chunkCreated = true + } + return c, true, chunkCreated +} + // cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after // acquiring lock. func (s *memSeries) cleanupAppendIDsBelow(bound uint64) { diff --git a/tsdb/querier.go b/tsdb/querier.go index af5007fc1..771fce76e 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -21,6 +21,7 @@ import ( "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -631,6 +632,9 @@ func (p *populateWithDelSeriesIterator) Seek(t int64) bool { } func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.curr.At() } +func (p *populateWithDelSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) { + return 0, histogram.SparseHistogram{} +} func (p *populateWithDelSeriesIterator) Err() error { if err := p.populateWithDelGenericSeriesIterator.Err(); err != nil { @@ -818,6 +822,10 @@ func (it *DeletedIterator) At() (int64, float64) { return it.Iter.At() } +func (it *DeletedIterator) AtHistogram() (int64, histogram.SparseHistogram) { + return 0, histogram.SparseHistogram{} +} + func (it *DeletedIterator) Seek(t int64) bool { if it.Iter.Err() != nil { return false diff --git a/tsdb/record/record.go b/tsdb/record/record.go index b4ee77f0f..1d5873f04 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -20,6 +20,7 @@ import ( "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/tombstones" @@ -67,6 +68,13 @@ type RefExemplar struct { Labels labels.Labels } +// RefHistogram is a histogram. +type RefHistogram struct { + Ref uint64 + T int64 + H histogram.SparseHistogram +} + // Decoder decodes series, sample, and tombstone records. // The zero value is ready to use. type Decoder struct { diff --git a/tsdb/tsdbutil/buffer.go b/tsdb/tsdbutil/buffer.go index a24d50472..1eb54f147 100644 --- a/tsdb/tsdbutil/buffer.go +++ b/tsdb/tsdbutil/buffer.go @@ -16,6 +16,7 @@ package tsdbutil import ( "math" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/tsdb/chunkenc" ) @@ -159,6 +160,10 @@ func (it *sampleRingIterator) At() (int64, float64) { return it.r.at(it.i) } +func (it *sampleRingIterator) AtHistogram() (int64, histogram.SparseHistogram) { + return 0, histogram.SparseHistogram{} +} + func (r *sampleRing) at(i int) (int64, float64) { j := (r.f + i) % len(r.buf) s := r.buf[j] diff --git a/tsdb/tsdbutil/buffer_test.go b/tsdb/tsdbutil/buffer_test.go index a66786b62..0c6175219 100644 --- a/tsdb/tsdbutil/buffer_test.go +++ b/tsdb/tsdbutil/buffer_test.go @@ -18,6 +18,7 @@ import ( "sort" "testing" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/stretchr/testify/require" ) @@ -150,6 +151,10 @@ func (it *listSeriesIterator) At() (int64, float64) { return s.t, s.v } +func (it *listSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) { + return 0, histogram.SparseHistogram{} +} + func (it *listSeriesIterator) Next() bool { it.idx++ return it.idx < len(it.list)