diff --git a/db.go b/db.go index 34f4336a7..823ccf1c0 100644 --- a/db.go +++ b/db.go @@ -234,7 +234,7 @@ func (s *Shard) appendBatch(samples []hashedSample) error { } // TODO(fabxc): randomize over time - if s.head.stats.SampleCount/uint64(s.head.stats.ChunkCount) > 400 { + if s.head.stats.SampleCount/(uint64(s.head.stats.ChunkCount)+1) > 400 { select { case s.persistCh <- struct{}{}: go func() { diff --git a/querier.go b/querier.go index e1d641b4d..387c26766 100644 --- a/querier.go +++ b/querier.go @@ -663,6 +663,11 @@ func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) { return b.buf.last() } +// Buffer returns an iterator over the buffered data. +func (b *BufferedSeriesIterator) Buffer() SeriesIterator { + return b.buf.iterator() +} + // Seek advances the iterator to the element at time t or greater. func (b *BufferedSeriesIterator) Seek(t int64) bool { t0 := t - b.buf.delta @@ -741,6 +746,38 @@ func (r *sampleRing) reset() { r.f = 0 } +func (r *sampleRing) iterator() SeriesIterator { + return &sampleRingIterator{r: r, i: -1} +} + +type sampleRingIterator struct { + r *sampleRing + i int +} + +func (it *sampleRingIterator) Next() bool { + it.i++ + return it.i < it.r.l +} + +func (it *sampleRingIterator) Seek(int64) bool { + return false +} + +func (it *sampleRingIterator) Err() error { + return nil +} + +func (it *sampleRingIterator) Values() (int64, float64) { + return it.r.at(it.i) +} + +func (r *sampleRing) at(i int) (int64, float64) { + j := (r.f + i) % len(r.buf) + s := r.buf[j] + return s.t, s.v +} + // add adds a sample to the ring buffer and frees all samples that fall // out of the delta range. func (r *sampleRing) add(t int64, v float64) {