prometheus/tsdb/head_read.go

746 lines
23 KiB
Go

// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"context"
"errors"
"fmt"
"math"
"sync"
"github.com/go-kit/log/level"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
)
func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) {
return h.exemplars.ExemplarQuerier(ctx)
}
// Index returns an IndexReader against the block.
func (h *Head) Index() (IndexReader, error) {
return h.indexRange(math.MinInt64, math.MaxInt64), nil
}
func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
if hmin := h.MinTime(); hmin > mint {
mint = hmin
}
return &headIndexReader{head: h, mint: mint, maxt: maxt}
}
type headIndexReader struct {
head *Head
mint, maxt int64
}
func (h *headIndexReader) Close() error {
return nil
}
func (h *headIndexReader) Symbols() index.StringIter {
return h.head.postings.Symbols()
}
// SortedLabelValues returns label values present in the head for the
// specific label name that are within the time range mint to maxt.
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
func (h *headIndexReader) SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
values, err := h.LabelValues(ctx, name, matchers...)
if err == nil {
slices.Sort(values)
}
return values, err
}
// LabelValues returns label values present in the head for the
// specific label name that are within the time range mint to maxt.
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
func (h *headIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() {
return []string{}, nil
}
if len(matchers) == 0 {
return h.head.postings.LabelValues(ctx, name), nil
}
return labelValuesWithMatchers(ctx, h, name, matchers...)
}
// LabelNames returns all the unique label names present in the head
// that are within the time range mint to maxt.
func (h *headIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error) {
if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() {
return []string{}, nil
}
if len(matchers) == 0 {
labelNames := h.head.postings.LabelNames()
slices.Sort(labelNames)
return labelNames, nil
}
return labelNamesWithMatchers(ctx, h, matchers...)
}
// Postings returns the postings list iterator for the label pairs.
func (h *headIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
switch len(values) {
case 0:
return index.EmptyPostings(), nil
case 1:
return h.head.postings.Get(name, values[0]), nil
default:
res := make([]index.Postings, 0, len(values))
for _, value := range values {
if p := h.head.postings.Get(name, value); !index.IsEmptyPostingsType(p) {
res = append(res, p)
}
}
return index.Merge(ctx, res...), nil
}
}
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
series := make([]*memSeries, 0, 128)
// Fetch all the series only once.
for p.Next() {
s := h.head.series.getByID(chunks.HeadSeriesRef(p.At()))
if s == nil {
level.Debug(h.head.logger).Log("msg", "Looked up series not found")
} else {
series = append(series, s)
}
}
if err := p.Err(); err != nil {
return index.ErrPostings(fmt.Errorf("expand postings: %w", err))
}
slices.SortFunc(series, func(a, b *memSeries) int {
return labels.Compare(a.lset, b.lset)
})
// Convert back to list.
ep := make([]storage.SeriesRef, 0, len(series))
for _, p := range series {
ep = append(ep, storage.SeriesRef(p.ref))
}
return index.NewListPostings(ep)
}
// Series returns the series for the given reference.
func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
s := h.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
h.head.metrics.seriesNotFound.Inc()
return storage.ErrNotFound
}
builder.Assign(s.lset)
s.Lock()
defer s.Unlock()
*chks = (*chks)[:0]
for i, c := range s.mmappedChunks {
// Do not expose chunks that are outside of the specified range.
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
continue
}
*chks = append(*chks, chunks.Meta{
MinTime: c.minTime,
MaxTime: c.maxTime,
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))),
})
}
if s.headChunks != nil {
var maxTime int64
var i, j int
for i = s.headChunks.len() - 1; i >= 0; i-- {
chk := s.headChunks.atOffset(i)
if i == 0 {
// Set the head chunk as open (being appended to) for the first headChunk.
maxTime = math.MaxInt64
} else {
maxTime = chk.maxTime
}
if chk.OverlapsClosedInterval(h.mint, h.maxt) {
*chks = append(*chks, chunks.Meta{
MinTime: chk.minTime,
MaxTime: maxTime,
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+j))),
})
}
j++
}
}
return nil
}
// headChunkID returns the HeadChunkID referred to by the given position.
// * 0 <= pos < len(s.mmappedChunks) refer to s.mmappedChunks[pos]
// * pos >= len(s.mmappedChunks) refers to s.headChunks linked list.
func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID {
return chunks.HeadChunkID(pos) + s.firstChunkID
}
// oooHeadChunkID returns the HeadChunkID referred to by the given position.
// * 0 <= pos < len(s.oooMmappedChunks) refer to s.oooMmappedChunks[pos]
// * pos == len(s.oooMmappedChunks) refers to s.oooHeadChunk
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID {
return chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID
}
// LabelValueFor returns label value for the given label name in the series referred to by ID.
func (h *headIndexReader) LabelValueFor(_ context.Context, id storage.SeriesRef, label string) (string, error) {
memSeries := h.head.series.getByID(chunks.HeadSeriesRef(id))
if memSeries == nil {
return "", storage.ErrNotFound
}
value := memSeries.lset.Get(label)
if value == "" {
return "", storage.ErrNotFound
}
return value, nil
}
// LabelNamesFor returns all the label names for the series referred to by IDs.
// The names returned are sorted.
func (h *headIndexReader) LabelNamesFor(ctx context.Context, ids ...storage.SeriesRef) ([]string, error) {
namesMap := make(map[string]struct{})
for _, id := range ids {
if ctx.Err() != nil {
return nil, ctx.Err()
}
memSeries := h.head.series.getByID(chunks.HeadSeriesRef(id))
if memSeries == nil {
return nil, storage.ErrNotFound
}
memSeries.lset.Range(func(lbl labels.Label) {
namesMap[lbl.Name] = struct{}{}
})
}
names := make([]string, 0, len(namesMap))
for name := range namesMap {
names = append(names, name)
}
slices.Sort(names)
return names, nil
}
// Chunks returns a ChunkReader against the block.
func (h *Head) Chunks() (ChunkReader, error) {
return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State(math.MinInt64, math.MaxInt64))
}
func (h *Head) chunksRange(mint, maxt int64, is *isolationState) (*headChunkReader, error) {
h.closedMtx.Lock()
defer h.closedMtx.Unlock()
if h.closed {
return nil, errors.New("can't read from a closed head")
}
if hmin := h.MinTime(); hmin > mint {
mint = hmin
}
return &headChunkReader{
head: h,
mint: mint,
maxt: maxt,
isoState: is,
}, nil
}
type headChunkReader struct {
head *Head
mint, maxt int64
isoState *isolationState
}
func (h *headChunkReader) Close() error {
if h.isoState != nil {
h.isoState.Close()
}
return nil
}
// ChunkOrIterable returns the chunk for the reference number.
func (h *headChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
chk, _, err := h.chunk(meta, false)
return chk, nil, err
}
// ChunkWithCopy returns the chunk for the reference number.
// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk.
func (h *headChunkReader) ChunkWithCopy(meta chunks.Meta) (chunkenc.Chunk, int64, error) {
return h.chunk(meta, true)
}
// chunk returns the chunk for the reference number.
// If copyLastChunk is true, then it makes a copy of the head chunk if asked for it.
// Also returns max time of the chunk.
func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.Chunk, int64, error) {
sid, cid := chunks.HeadChunkRef(meta.Ref).Unpack()
s := h.head.series.getByID(sid)
// This means that the series has been garbage collected.
if s == nil {
return nil, 0, storage.ErrNotFound
}
s.Lock()
c, headChunk, isOpen, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool)
if err != nil {
s.Unlock()
return nil, 0, err
}
defer func() {
if !headChunk {
// Set this to nil so that Go GC can collect it after it has been used.
c.chunk = nil
c.prev = nil
h.head.memChunkPool.Put(c)
}
}()
// This means that the chunk is outside the specified range.
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
s.Unlock()
return nil, 0, storage.ErrNotFound
}
chk, maxTime := c.chunk, c.maxTime
if headChunk && isOpen && copyLastChunk {
// The caller may ask to copy the head chunk in order to take the
// bytes of the chunk without causing the race between read and append.
b := s.headChunks.chunk.Bytes()
newB := make([]byte, len(b))
copy(newB, b) // TODO(codesome): Use bytes.Clone() when we upgrade to Go 1.20.
// TODO(codesome): Put back in the pool (non-trivial).
chk, err = h.head.opts.ChunkPool.Get(s.headChunks.chunk.Encoding(), newB)
if err != nil {
return nil, 0, err
}
}
s.Unlock()
return &safeHeadChunk{
Chunk: chk,
s: s,
cid: cid,
isoState: h.isoState,
}, maxTime, nil
}
// chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk.
// If headChunk is false, it means that the returned *memChunk
// (and not the chunkenc.Chunk inside it) can be garbage collected after its usage.
// if isOpen is true, it means that the returned *memChunk is used for appends.
func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk *memChunk, headChunk, isOpen bool, err error) {
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are
// incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index.
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
// is >= len(s.mmappedChunks), it represents one of the chunks on s.headChunks linked list.
// The order of elemens is different for slice and linked list.
// For s.mmappedChunks slice newer chunks are appended to it.
// For s.headChunks list newer chunks are prepended to it.
//
// memSeries {
// mmappedChunks: [t0, t1, t2]
// headChunk: {t5}->{t4}->{t3}
// }
ix := int(id) - int(s.firstChunkID)
var headChunksLen int
if s.headChunks != nil {
headChunksLen = s.headChunks.len()
}
if ix < 0 || ix > len(s.mmappedChunks)+headChunksLen-1 {
return nil, false, false, storage.ErrNotFound
}
if ix < len(s.mmappedChunks) {
chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref)
if err != nil {
var cerr *chunks.CorruptionErr
if errors.As(err, &cerr) {
panic(err)
}
return nil, false, false, err
}
mc := memChunkPool.Get().(*memChunk)
mc.chunk = chk
mc.minTime = s.mmappedChunks[ix].minTime
mc.maxTime = s.mmappedChunks[ix].maxTime
return mc, false, false, nil
}
ix -= len(s.mmappedChunks)
offset := headChunksLen - ix - 1
// headChunks is a linked list where first element is the most recent one and the last one is the oldest.
// This order is reversed when compared with mmappedChunks, since mmappedChunks[0] is the oldest chunk,
// while headChunk.atOffset(0) would give us the most recent chunk.
// So when calling headChunk.atOffset() we need to reverse the value of ix.
elem := s.headChunks.atOffset(offset)
if elem == nil {
// This should never really happen and would mean that headChunksLen value is NOT equal
// to the length of the headChunks list.
return nil, false, false, storage.ErrNotFound
}
return elem, true, offset == 0, nil
}
// oooMergedChunks return an iterable over one or more OOO chunks for the given
// chunks.Meta reference from memory or by m-mapping it from the disk. The
// returned iterable will be a merge of all the overlapping chunks, if any,
// amongst all the chunks in the OOOHead.
// This function is not thread safe unless the caller holds a lock.
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64) (*mergedOOOChunks, error) {
_, cid := chunks.HeadChunkRef(meta.Ref).Unpack()
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are
// incremented by 1 when new chunk is created, hence (meta - firstChunkID) gives the slice index.
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
// is len(s.mmappedChunks), it represents the next chunk, which is the head chunk.
ix := int(cid) - int(s.ooo.firstOOOChunkID)
if ix < 0 || ix > len(s.ooo.oooMmappedChunks) {
return nil, storage.ErrNotFound
}
if ix == len(s.ooo.oooMmappedChunks) {
if s.ooo.oooHeadChunk == nil {
return nil, errors.New("invalid ooo head chunk")
}
}
// We create a temporary slice of chunk metas to hold the information of all
// possible chunks that may overlap with the requested chunk.
tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks))
oooHeadRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
if s.ooo.oooHeadChunk != nil && s.ooo.oooHeadChunk.OverlapsClosedInterval(mint, maxt) {
// We only want to append the head chunk if this chunk existed when
// Series() was called. This brings consistency in case new data
// is added in between Series() and Chunk() calls.
if oooHeadRef == meta.OOOLastRef {
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
meta: chunks.Meta{
// Ignoring samples added before and after the last known min and max time for this chunk.
MinTime: meta.OOOLastMinTime,
MaxTime: meta.OOOLastMaxTime,
Ref: oooHeadRef,
},
})
}
}
for i, c := range s.ooo.oooMmappedChunks {
chunkRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i)))
// We can skip chunks that came in later than the last known OOOLastRef.
if chunkRef > meta.OOOLastRef {
break
}
switch {
case chunkRef == meta.OOOLastRef:
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
meta: chunks.Meta{
MinTime: meta.OOOLastMinTime,
MaxTime: meta.OOOLastMaxTime,
Ref: chunkRef,
},
ref: c.ref,
origMinT: c.minTime,
origMaxT: c.maxTime,
})
case c.OverlapsClosedInterval(mint, maxt):
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
meta: chunks.Meta{
MinTime: c.minTime,
MaxTime: c.maxTime,
Ref: chunkRef,
},
ref: c.ref,
})
}
}
// Next we want to sort all the collected chunks by min time so we can find
// those that overlap and stop when we know the rest don't.
slices.SortFunc(tmpChks, refLessByMinTimeAndMinRef)
mc := &mergedOOOChunks{}
absoluteMax := int64(math.MinInt64)
for _, c := range tmpChks {
if c.meta.Ref != meta.Ref && (len(mc.chunkIterables) == 0 || c.meta.MinTime > absoluteMax) {
continue
}
var iterable chunkenc.Iterable
if c.meta.Ref == oooHeadRef {
var xor *chunkenc.XORChunk
var err error
// If head chunk min and max time match the meta OOO markers
// that means that the chunk has not expanded so we can append
// it as it is.
if s.ooo.oooHeadChunk.minTime == meta.OOOLastMinTime && s.ooo.oooHeadChunk.maxTime == meta.OOOLastMaxTime {
xor, err = s.ooo.oooHeadChunk.chunk.ToXOR() // TODO(jesus.vazquez) (This is an optimization idea that has no priority and might not be that useful) See if we could use a copy of the underlying slice. That would leave the more expensive ToXOR() function only for the usecase where Bytes() is called.
} else {
// We need to remove samples that are outside of the markers
xor, err = s.ooo.oooHeadChunk.chunk.ToXORBetweenTimestamps(meta.OOOLastMinTime, meta.OOOLastMaxTime)
}
if err != nil {
return nil, fmt.Errorf("failed to convert ooo head chunk to xor chunk: %w", err)
}
iterable = xor
} else {
chk, err := cdm.Chunk(c.ref)
if err != nil {
var cerr *chunks.CorruptionErr
if errors.As(err, &cerr) {
return nil, fmt.Errorf("invalid ooo mmapped chunk: %w", err)
}
return nil, err
}
if c.meta.Ref == meta.OOOLastRef &&
(c.origMinT != meta.OOOLastMinTime || c.origMaxT != meta.OOOLastMaxTime) {
// The head expanded and was memory mapped so now we need to
// wrap the chunk within a chunk that doesnt allows us to iterate
// through samples out of the OOOLastMinT and OOOLastMaxT
// markers.
iterable = boundedIterable{chk, meta.OOOLastMinTime, meta.OOOLastMaxTime}
} else {
iterable = chk
}
}
mc.chunkIterables = append(mc.chunkIterables, iterable)
if c.meta.MaxTime > absoluteMax {
absoluteMax = c.meta.MaxTime
}
}
return mc, nil
}
var _ chunkenc.Iterable = &mergedOOOChunks{}
// mergedOOOChunks holds the list of iterables for overlapping chunks.
type mergedOOOChunks struct {
chunkIterables []chunkenc.Iterable
}
func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator {
return storage.ChainSampleIteratorFromIterables(iterator, o.chunkIterables)
}
var _ chunkenc.Iterable = &boundedIterable{}
// boundedIterable is an implementation of chunkenc.Iterable that uses a
// boundedIterator that only iterates through samples which timestamps are
// >= minT and <= maxT.
type boundedIterable struct {
chunk chunkenc.Chunk
minT int64
maxT int64
}
func (b boundedIterable) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator {
it := b.chunk.Iterator(iterator)
if it == nil {
panic("iterator shouldn't be nil")
}
return boundedIterator{it, b.minT, b.maxT}
}
var _ chunkenc.Iterator = &boundedIterator{}
// boundedIterator is an implementation of Iterator that only iterates through
// samples which timestamps are >= minT and <= maxT.
type boundedIterator struct {
chunkenc.Iterator
minT int64
maxT int64
}
// Next the first time its called it will advance as many positions as necessary
// until its able to find a sample within the bounds minT and maxT.
// If there are samples within bounds it will advance one by one amongst them.
// If there are no samples within bounds it will return false.
func (b boundedIterator) Next() chunkenc.ValueType {
for b.Iterator.Next() == chunkenc.ValFloat {
t, _ := b.Iterator.At()
switch {
case t < b.minT:
continue
case t > b.maxT:
return chunkenc.ValNone
default:
return chunkenc.ValFloat
}
}
return chunkenc.ValNone
}
func (b boundedIterator) Seek(t int64) chunkenc.ValueType {
if t < b.minT {
// We must seek at least up to b.minT if it is asked for something before that.
val := b.Iterator.Seek(b.minT)
if !(val == chunkenc.ValFloat) {
return chunkenc.ValNone
}
t, _ := b.Iterator.At()
if t <= b.maxT {
return chunkenc.ValFloat
}
}
if t > b.maxT {
// We seek anyway so that the subsequent Next() calls will also return false.
b.Iterator.Seek(t)
return chunkenc.ValNone
}
return b.Iterator.Seek(t)
}
// safeHeadChunk makes sure that the chunk can be accessed without a race condition.
type safeHeadChunk struct {
chunkenc.Chunk
s *memSeries
cid chunks.HeadChunkID
isoState *isolationState
}
func (c *safeHeadChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
c.s.Lock()
it := c.s.iterator(c.cid, c.Chunk, c.isoState, reuseIter)
c.s.Unlock()
return it
}
// iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range.
// It is unsafe to call this concurrently with s.append(...) without holding the series lock.
func (s *memSeries) iterator(id chunks.HeadChunkID, c chunkenc.Chunk, isoState *isolationState, it chunkenc.Iterator) chunkenc.Iterator {
ix := int(id) - int(s.firstChunkID)
numSamples := c.NumSamples()
stopAfter := numSamples
if isoState != nil && !isoState.IsolationDisabled() {
totalSamples := 0 // Total samples in this series.
previousSamples := 0 // Samples before this chunk.
for j, d := range s.mmappedChunks {
totalSamples += int(d.numSamples)
if j < ix {
previousSamples += int(d.numSamples)
}
}
ix -= len(s.mmappedChunks)
if s.headChunks != nil {
// Iterate all head chunks from the oldest to the newest.
headChunksLen := s.headChunks.len()
for j := headChunksLen - 1; j >= 0; j-- {
chk := s.headChunks.atOffset(j)
chkSamples := chk.chunk.NumSamples()
totalSamples += chkSamples
// Chunk ID is len(s.mmappedChunks) + $(headChunks list position).
// Where $(headChunks list position) is zero for the oldest chunk and $(s.headChunks.len() - 1)
// for the newest (open) chunk.
if headChunksLen-1-j < ix {
previousSamples += chkSamples
}
}
}
// Removing the extra transactionIDs that are relevant for samples that
// come after this chunk, from the total transactionIDs.
appendIDsToConsider := int(s.txs.txIDCount) - (totalSamples - (previousSamples + numSamples))
// Iterate over the appendIDs, find the first one that the isolation state says not
// to return.
it := s.txs.iterator()
for index := 0; index < appendIDsToConsider; index++ {
appendID := it.At()
if appendID <= isoState.maxAppendID { // Easy check first.
if _, ok := isoState.incompleteAppends[appendID]; !ok {
it.Next()
continue
}
}
stopAfter = numSamples - (appendIDsToConsider - index)
if stopAfter < 0 {
stopAfter = 0 // Stopped in a previous chunk.
}
break
}
}
if stopAfter == 0 {
return chunkenc.NewNopIterator()
}
if stopAfter == numSamples {
return c.Iterator(it)
}
return makeStopIterator(c, it, stopAfter)
}
// stopIterator wraps an Iterator, but only returns the first
// stopAfter values, if initialized with i=-1.
type stopIterator struct {
chunkenc.Iterator
i, stopAfter int
}
func (it *stopIterator) Next() chunkenc.ValueType {
if it.i+1 >= it.stopAfter {
return chunkenc.ValNone
}
it.i++
return it.Iterator.Next()
}
func makeStopIterator(c chunkenc.Chunk, it chunkenc.Iterator, stopAfter int) chunkenc.Iterator {
// Re-use the Iterator object if it is a stopIterator.
if stopIter, ok := it.(*stopIterator); ok {
stopIter.Iterator = c.Iterator(stopIter.Iterator)
stopIter.i = -1
stopIter.stopAfter = stopAfter
return stopIter
}
return &stopIterator{
Iterator: c.Iterator(it),
i: -1,
stopAfter: stopAfter,
}
}