mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
787 lines
24 KiB
787 lines
24 KiB
// Copyright 2021 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package tsdb |
|
|
|
import ( |
|
"context" |
|
"math" |
|
"sync" |
|
|
|
"github.com/go-kit/log/level" |
|
"github.com/pkg/errors" |
|
"golang.org/x/exp/slices" |
|
|
|
"github.com/prometheus/prometheus/model/labels" |
|
"github.com/prometheus/prometheus/storage" |
|
"github.com/prometheus/prometheus/tsdb/chunkenc" |
|
"github.com/prometheus/prometheus/tsdb/chunks" |
|
"github.com/prometheus/prometheus/tsdb/index" |
|
) |
|
|
|
func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { |
|
return h.exemplars.ExemplarQuerier(ctx) |
|
} |
|
|
|
// Index returns an IndexReader against the block. |
|
func (h *Head) Index() (IndexReader, error) { |
|
return h.indexRange(math.MinInt64, math.MaxInt64), nil |
|
} |
|
|
|
func (h *Head) indexRange(mint, maxt int64) *headIndexReader { |
|
if hmin := h.MinTime(); hmin > mint { |
|
mint = hmin |
|
} |
|
return &headIndexReader{head: h, mint: mint, maxt: maxt} |
|
} |
|
|
|
type headIndexReader struct { |
|
head *Head |
|
mint, maxt int64 |
|
} |
|
|
|
func (h *headIndexReader) Close() error { |
|
return nil |
|
} |
|
|
|
func (h *headIndexReader) Symbols() index.StringIter { |
|
return h.head.postings.Symbols() |
|
} |
|
|
|
// SortedLabelValues returns label values present in the head for the |
|
// specific label name that are within the time range mint to maxt. |
|
// If matchers are specified the returned result set is reduced |
|
// to label values of metrics matching the matchers. |
|
func (h *headIndexReader) SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { |
|
values, err := h.LabelValues(ctx, name, matchers...) |
|
if err == nil { |
|
slices.Sort(values) |
|
} |
|
return values, err |
|
} |
|
|
|
// LabelValues returns label values present in the head for the |
|
// specific label name that are within the time range mint to maxt. |
|
// If matchers are specified the returned result set is reduced |
|
// to label values of metrics matching the matchers. |
|
func (h *headIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { |
|
if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() { |
|
return []string{}, nil |
|
} |
|
|
|
if len(matchers) == 0 { |
|
return h.head.postings.LabelValues(ctx, name), nil |
|
} |
|
|
|
return labelValuesWithMatchers(ctx, h, name, matchers...) |
|
} |
|
|
|
// LabelNames returns all the unique label names present in the head |
|
// that are within the time range mint to maxt. |
|
func (h *headIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error) { |
|
if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() { |
|
return []string{}, nil |
|
} |
|
|
|
if len(matchers) == 0 { |
|
labelNames := h.head.postings.LabelNames() |
|
slices.Sort(labelNames) |
|
return labelNames, nil |
|
} |
|
|
|
return labelNamesWithMatchers(ctx, h, matchers...) |
|
} |
|
|
|
// Postings returns the postings list iterator for the label pairs. |
|
func (h *headIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) { |
|
switch len(values) { |
|
case 0: |
|
return index.EmptyPostings(), nil |
|
case 1: |
|
return h.head.postings.Get(name, values[0]), nil |
|
default: |
|
res := make([]index.Postings, 0, len(values)) |
|
for _, value := range values { |
|
if p := h.head.postings.Get(name, value); !index.IsEmptyPostingsType(p) { |
|
res = append(res, p) |
|
} |
|
} |
|
return index.Merge(ctx, res...), nil |
|
} |
|
} |
|
|
|
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { |
|
series := make([]*memSeries, 0, 128) |
|
|
|
// Fetch all the series only once. |
|
for p.Next() { |
|
s := h.head.series.getByID(chunks.HeadSeriesRef(p.At())) |
|
if s == nil { |
|
level.Debug(h.head.logger).Log("msg", "Looked up series not found") |
|
} else { |
|
series = append(series, s) |
|
} |
|
} |
|
if err := p.Err(); err != nil { |
|
return index.ErrPostings(errors.Wrap(err, "expand postings")) |
|
} |
|
|
|
slices.SortFunc(series, func(a, b *memSeries) int { |
|
return labels.Compare(a.lset, b.lset) |
|
}) |
|
|
|
// Convert back to list. |
|
ep := make([]storage.SeriesRef, 0, len(series)) |
|
for _, p := range series { |
|
ep = append(ep, storage.SeriesRef(p.ref)) |
|
} |
|
return index.NewListPostings(ep) |
|
} |
|
|
|
// Series returns the series for the given reference. |
|
func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { |
|
s := h.head.series.getByID(chunks.HeadSeriesRef(ref)) |
|
|
|
if s == nil { |
|
h.head.metrics.seriesNotFound.Inc() |
|
return storage.ErrNotFound |
|
} |
|
builder.Assign(s.lset) |
|
|
|
s.Lock() |
|
defer s.Unlock() |
|
|
|
*chks = (*chks)[:0] |
|
|
|
for i, c := range s.mmappedChunks { |
|
// Do not expose chunks that are outside of the specified range. |
|
if !c.OverlapsClosedInterval(h.mint, h.maxt) { |
|
continue |
|
} |
|
*chks = append(*chks, chunks.Meta{ |
|
MinTime: c.minTime, |
|
MaxTime: c.maxTime, |
|
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))), |
|
}) |
|
} |
|
|
|
if s.headChunks != nil { |
|
var maxTime int64 |
|
var i, j int |
|
for i = s.headChunks.len() - 1; i >= 0; i-- { |
|
chk := s.headChunks.atOffset(i) |
|
if i == 0 { |
|
// Set the head chunk as open (being appended to) for the first headChunk. |
|
maxTime = math.MaxInt64 |
|
} else { |
|
maxTime = chk.maxTime |
|
} |
|
if chk.OverlapsClosedInterval(h.mint, h.maxt) { |
|
*chks = append(*chks, chunks.Meta{ |
|
MinTime: chk.minTime, |
|
MaxTime: maxTime, |
|
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+j))), |
|
}) |
|
} |
|
j++ |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// headChunkID returns the HeadChunkID referred to by the given position. |
|
// * 0 <= pos < len(s.mmappedChunks) refer to s.mmappedChunks[pos] |
|
// * pos >= len(s.mmappedChunks) refers to s.headChunks linked list |
|
func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID { |
|
return chunks.HeadChunkID(pos) + s.firstChunkID |
|
} |
|
|
|
// oooHeadChunkID returns the HeadChunkID referred to by the given position. |
|
// * 0 <= pos < len(s.oooMmappedChunks) refer to s.oooMmappedChunks[pos] |
|
// * pos == len(s.oooMmappedChunks) refers to s.oooHeadChunk |
|
// The caller must ensure that s.ooo is not nil. |
|
func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID { |
|
return chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID |
|
} |
|
|
|
// LabelValueFor returns label value for the given label name in the series referred to by ID. |
|
func (h *headIndexReader) LabelValueFor(_ context.Context, id storage.SeriesRef, label string) (string, error) { |
|
memSeries := h.head.series.getByID(chunks.HeadSeriesRef(id)) |
|
if memSeries == nil { |
|
return "", storage.ErrNotFound |
|
} |
|
|
|
value := memSeries.lset.Get(label) |
|
if value == "" { |
|
return "", storage.ErrNotFound |
|
} |
|
|
|
return value, nil |
|
} |
|
|
|
// LabelNamesFor returns all the label names for the series referred to by IDs. |
|
// The names returned are sorted. |
|
func (h *headIndexReader) LabelNamesFor(ctx context.Context, ids ...storage.SeriesRef) ([]string, error) { |
|
namesMap := make(map[string]struct{}) |
|
for _, id := range ids { |
|
if ctx.Err() != nil { |
|
return nil, ctx.Err() |
|
} |
|
memSeries := h.head.series.getByID(chunks.HeadSeriesRef(id)) |
|
if memSeries == nil { |
|
return nil, storage.ErrNotFound |
|
} |
|
memSeries.lset.Range(func(lbl labels.Label) { |
|
namesMap[lbl.Name] = struct{}{} |
|
}) |
|
} |
|
names := make([]string, 0, len(namesMap)) |
|
for name := range namesMap { |
|
names = append(names, name) |
|
} |
|
slices.Sort(names) |
|
return names, nil |
|
} |
|
|
|
// Chunks returns a ChunkReader against the block. |
|
func (h *Head) Chunks() (ChunkReader, error) { |
|
return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State(math.MinInt64, math.MaxInt64)) |
|
} |
|
|
|
func (h *Head) chunksRange(mint, maxt int64, is *isolationState) (*headChunkReader, error) { |
|
h.closedMtx.Lock() |
|
defer h.closedMtx.Unlock() |
|
if h.closed { |
|
return nil, errors.New("can't read from a closed head") |
|
} |
|
if hmin := h.MinTime(); hmin > mint { |
|
mint = hmin |
|
} |
|
return &headChunkReader{ |
|
head: h, |
|
mint: mint, |
|
maxt: maxt, |
|
isoState: is, |
|
}, nil |
|
} |
|
|
|
type headChunkReader struct { |
|
head *Head |
|
mint, maxt int64 |
|
isoState *isolationState |
|
} |
|
|
|
func (h *headChunkReader) Close() error { |
|
if h.isoState != nil { |
|
h.isoState.Close() |
|
} |
|
return nil |
|
} |
|
|
|
// Chunk returns the chunk for the reference number. |
|
func (h *headChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) { |
|
chk, _, err := h.chunk(meta, false) |
|
return chk, err |
|
} |
|
|
|
// ChunkWithCopy returns the chunk for the reference number. |
|
// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk. |
|
func (h *headChunkReader) ChunkWithCopy(meta chunks.Meta) (chunkenc.Chunk, int64, error) { |
|
return h.chunk(meta, true) |
|
} |
|
|
|
// chunk returns the chunk for the reference number. |
|
// If copyLastChunk is true, then it makes a copy of the head chunk if asked for it. |
|
// Also returns max time of the chunk. |
|
func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.Chunk, int64, error) { |
|
sid, cid := chunks.HeadChunkRef(meta.Ref).Unpack() |
|
|
|
s := h.head.series.getByID(sid) |
|
// This means that the series has been garbage collected. |
|
if s == nil { |
|
return nil, 0, storage.ErrNotFound |
|
} |
|
|
|
s.Lock() |
|
c, headChunk, isOpen, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool) |
|
if err != nil { |
|
s.Unlock() |
|
return nil, 0, err |
|
} |
|
defer func() { |
|
if !headChunk { |
|
// Set this to nil so that Go GC can collect it after it has been used. |
|
c.chunk = nil |
|
c.prev = nil |
|
h.head.memChunkPool.Put(c) |
|
} |
|
}() |
|
|
|
// This means that the chunk is outside the specified range. |
|
if !c.OverlapsClosedInterval(h.mint, h.maxt) { |
|
s.Unlock() |
|
return nil, 0, storage.ErrNotFound |
|
} |
|
|
|
chk, maxTime := c.chunk, c.maxTime |
|
if headChunk && isOpen && copyLastChunk { |
|
// The caller may ask to copy the head chunk in order to take the |
|
// bytes of the chunk without causing the race between read and append. |
|
b := s.headChunks.chunk.Bytes() |
|
newB := make([]byte, len(b)) |
|
copy(newB, b) // TODO(codesome): Use bytes.Clone() when we upgrade to Go 1.20. |
|
// TODO(codesome): Put back in the pool (non-trivial). |
|
chk, err = h.head.opts.ChunkPool.Get(s.headChunks.chunk.Encoding(), newB) |
|
if err != nil { |
|
return nil, 0, err |
|
} |
|
} |
|
s.Unlock() |
|
|
|
return &safeHeadChunk{ |
|
Chunk: chk, |
|
s: s, |
|
cid: cid, |
|
isoState: h.isoState, |
|
}, maxTime, nil |
|
} |
|
|
|
// chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk. |
|
// If headChunk is false, it means that the returned *memChunk |
|
// (and not the chunkenc.Chunk inside it) can be garbage collected after its usage. |
|
// if isOpen is true, it means that the returned *memChunk is used for appends. |
|
func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk *memChunk, headChunk, isOpen bool, err error) { |
|
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are |
|
// incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. |
|
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix |
|
// is >= len(s.mmappedChunks), it represents one of the chunks on s.headChunks linked list. |
|
// The order of elemens is different for slice and linked list. |
|
// For s.mmappedChunks slice newer chunks are appended to it. |
|
// For s.headChunks list newer chunks are prepended to it. |
|
// |
|
// memSeries { |
|
// mmappedChunks: [t0, t1, t2] |
|
// headChunk: {t5}->{t4}->{t3} |
|
// } |
|
ix := int(id) - int(s.firstChunkID) |
|
|
|
var headChunksLen int |
|
if s.headChunks != nil { |
|
headChunksLen = s.headChunks.len() |
|
} |
|
|
|
if ix < 0 || ix > len(s.mmappedChunks)+headChunksLen-1 { |
|
return nil, false, false, storage.ErrNotFound |
|
} |
|
|
|
if ix < len(s.mmappedChunks) { |
|
chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref) |
|
if err != nil { |
|
if _, ok := err.(*chunks.CorruptionErr); ok { |
|
panic(err) |
|
} |
|
return nil, false, false, err |
|
} |
|
mc := memChunkPool.Get().(*memChunk) |
|
mc.chunk = chk |
|
mc.minTime = s.mmappedChunks[ix].minTime |
|
mc.maxTime = s.mmappedChunks[ix].maxTime |
|
return mc, false, false, nil |
|
} |
|
|
|
ix -= len(s.mmappedChunks) |
|
|
|
offset := headChunksLen - ix - 1 |
|
// headChunks is a linked list where first element is the most recent one and the last one is the oldest. |
|
// This order is reversed when compared with mmappedChunks, since mmappedChunks[0] is the oldest chunk, |
|
// while headChunk.atOffset(0) would give us the most recent chunk. |
|
// So when calling headChunk.atOffset() we need to reverse the value of ix. |
|
elem := s.headChunks.atOffset(offset) |
|
if elem == nil { |
|
// This should never really happen and would mean that headChunksLen value is NOT equal |
|
// to the length of the headChunks list. |
|
return nil, false, false, storage.ErrNotFound |
|
} |
|
return elem, true, offset == 0, nil |
|
} |
|
|
|
// oooMergedChunk returns the requested chunk based on the given chunks.Meta |
|
// reference from memory or by m-mapping it from the disk. The returned chunk |
|
// might be a merge of all the overlapping chunks, if any, amongst all the |
|
// chunks in the OOOHead. |
|
// This function is not thread safe unless the caller holds a lock. |
|
// The caller must ensure that s.ooo is not nil. |
|
func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64) (chunk *mergedOOOChunks, err error) { |
|
_, cid := chunks.HeadChunkRef(meta.Ref).Unpack() |
|
|
|
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are |
|
// incremented by 1 when new chunk is created, hence (meta - firstChunkID) gives the slice index. |
|
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix |
|
// is len(s.mmappedChunks), it represents the next chunk, which is the head chunk. |
|
ix := int(cid) - int(s.ooo.firstOOOChunkID) |
|
if ix < 0 || ix > len(s.ooo.oooMmappedChunks) { |
|
return nil, storage.ErrNotFound |
|
} |
|
|
|
if ix == len(s.ooo.oooMmappedChunks) { |
|
if s.ooo.oooHeadChunk == nil { |
|
return nil, errors.New("invalid ooo head chunk") |
|
} |
|
} |
|
|
|
// We create a temporary slice of chunk metas to hold the information of all |
|
// possible chunks that may overlap with the requested chunk. |
|
tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks)) |
|
|
|
oooHeadRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks)))) |
|
if s.ooo.oooHeadChunk != nil && s.ooo.oooHeadChunk.OverlapsClosedInterval(mint, maxt) { |
|
// We only want to append the head chunk if this chunk existed when |
|
// Series() was called. This brings consistency in case new data |
|
// is added in between Series() and Chunk() calls. |
|
if oooHeadRef == meta.OOOLastRef { |
|
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ |
|
meta: chunks.Meta{ |
|
// Ignoring samples added before and after the last known min and max time for this chunk. |
|
MinTime: meta.OOOLastMinTime, |
|
MaxTime: meta.OOOLastMaxTime, |
|
Ref: oooHeadRef, |
|
}, |
|
}) |
|
} |
|
} |
|
|
|
for i, c := range s.ooo.oooMmappedChunks { |
|
chunkRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))) |
|
// We can skip chunks that came in later than the last known OOOLastRef. |
|
if chunkRef > meta.OOOLastRef { |
|
break |
|
} |
|
|
|
switch { |
|
case chunkRef == meta.OOOLastRef: |
|
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ |
|
meta: chunks.Meta{ |
|
MinTime: meta.OOOLastMinTime, |
|
MaxTime: meta.OOOLastMaxTime, |
|
Ref: chunkRef, |
|
}, |
|
ref: c.ref, |
|
origMinT: c.minTime, |
|
origMaxT: c.maxTime, |
|
}) |
|
case c.OverlapsClosedInterval(mint, maxt): |
|
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{ |
|
meta: chunks.Meta{ |
|
MinTime: c.minTime, |
|
MaxTime: c.maxTime, |
|
Ref: chunkRef, |
|
}, |
|
ref: c.ref, |
|
}) |
|
} |
|
} |
|
|
|
// Next we want to sort all the collected chunks by min time so we can find |
|
// those that overlap and stop when we know the rest don't. |
|
slices.SortFunc(tmpChks, refLessByMinTimeAndMinRef) |
|
|
|
mc := &mergedOOOChunks{} |
|
absoluteMax := int64(math.MinInt64) |
|
for _, c := range tmpChks { |
|
if c.meta.Ref != meta.Ref && (len(mc.chunks) == 0 || c.meta.MinTime > absoluteMax) { |
|
continue |
|
} |
|
if c.meta.Ref == oooHeadRef { |
|
var xor *chunkenc.XORChunk |
|
// If head chunk min and max time match the meta OOO markers |
|
// that means that the chunk has not expanded so we can append |
|
// it as it is. |
|
if s.ooo.oooHeadChunk.minTime == meta.OOOLastMinTime && s.ooo.oooHeadChunk.maxTime == meta.OOOLastMaxTime { |
|
xor, err = s.ooo.oooHeadChunk.chunk.ToXOR() // TODO(jesus.vazquez) (This is an optimization idea that has no priority and might not be that useful) See if we could use a copy of the underlying slice. That would leave the more expensive ToXOR() function only for the usecase where Bytes() is called. |
|
} else { |
|
// We need to remove samples that are outside of the markers |
|
xor, err = s.ooo.oooHeadChunk.chunk.ToXORBetweenTimestamps(meta.OOOLastMinTime, meta.OOOLastMaxTime) |
|
} |
|
if err != nil { |
|
return nil, errors.Wrap(err, "failed to convert ooo head chunk to xor chunk") |
|
} |
|
c.meta.Chunk = xor |
|
} else { |
|
chk, err := cdm.Chunk(c.ref) |
|
if err != nil { |
|
if _, ok := err.(*chunks.CorruptionErr); ok { |
|
return nil, errors.Wrap(err, "invalid ooo mmapped chunk") |
|
} |
|
return nil, err |
|
} |
|
if c.meta.Ref == meta.OOOLastRef && |
|
(c.origMinT != meta.OOOLastMinTime || c.origMaxT != meta.OOOLastMaxTime) { |
|
// The head expanded and was memory mapped so now we need to |
|
// wrap the chunk within a chunk that doesnt allows us to iterate |
|
// through samples out of the OOOLastMinT and OOOLastMaxT |
|
// markers. |
|
c.meta.Chunk = boundedChunk{chk, meta.OOOLastMinTime, meta.OOOLastMaxTime} |
|
} else { |
|
c.meta.Chunk = chk |
|
} |
|
} |
|
mc.chunks = append(mc.chunks, c.meta) |
|
if c.meta.MaxTime > absoluteMax { |
|
absoluteMax = c.meta.MaxTime |
|
} |
|
} |
|
|
|
return mc, nil |
|
} |
|
|
|
var _ chunkenc.Chunk = &mergedOOOChunks{} |
|
|
|
// mergedOOOChunks holds the list of overlapping chunks. This struct satisfies |
|
// chunkenc.Chunk. |
|
type mergedOOOChunks struct { |
|
chunks []chunks.Meta |
|
} |
|
|
|
// Bytes is a very expensive method because its calling the iterator of all the |
|
// chunks in the mergedOOOChunk and building a new chunk with the samples. |
|
func (o mergedOOOChunks) Bytes() []byte { |
|
xc := chunkenc.NewXORChunk() |
|
app, err := xc.Appender() |
|
if err != nil { |
|
panic(err) |
|
} |
|
it := o.Iterator(nil) |
|
for it.Next() == chunkenc.ValFloat { |
|
t, v := it.At() |
|
app.Append(t, v) |
|
} |
|
|
|
return xc.Bytes() |
|
} |
|
|
|
func (o mergedOOOChunks) Encoding() chunkenc.Encoding { |
|
return chunkenc.EncXOR |
|
} |
|
|
|
func (o mergedOOOChunks) Appender() (chunkenc.Appender, error) { |
|
return nil, errors.New("can't append to mergedOOOChunks") |
|
} |
|
|
|
func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator { |
|
return storage.ChainSampleIteratorFromMetas(iterator, o.chunks) |
|
} |
|
|
|
func (o mergedOOOChunks) NumSamples() int { |
|
samples := 0 |
|
for _, c := range o.chunks { |
|
samples += c.Chunk.NumSamples() |
|
} |
|
return samples |
|
} |
|
|
|
func (o mergedOOOChunks) Compact() {} |
|
|
|
var _ chunkenc.Chunk = &boundedChunk{} |
|
|
|
// boundedChunk is an implementation of chunkenc.Chunk that uses a |
|
// boundedIterator that only iterates through samples which timestamps are |
|
// >= minT and <= maxT |
|
type boundedChunk struct { |
|
chunkenc.Chunk |
|
minT int64 |
|
maxT int64 |
|
} |
|
|
|
func (b boundedChunk) Bytes() []byte { |
|
xor := chunkenc.NewXORChunk() |
|
a, _ := xor.Appender() |
|
it := b.Iterator(nil) |
|
for it.Next() == chunkenc.ValFloat { |
|
t, v := it.At() |
|
a.Append(t, v) |
|
} |
|
return xor.Bytes() |
|
} |
|
|
|
func (b boundedChunk) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator { |
|
it := b.Chunk.Iterator(iterator) |
|
if it == nil { |
|
panic("iterator shouldn't be nil") |
|
} |
|
return boundedIterator{it, b.minT, b.maxT} |
|
} |
|
|
|
var _ chunkenc.Iterator = &boundedIterator{} |
|
|
|
// boundedIterator is an implementation of Iterator that only iterates through |
|
// samples which timestamps are >= minT and <= maxT |
|
type boundedIterator struct { |
|
chunkenc.Iterator |
|
minT int64 |
|
maxT int64 |
|
} |
|
|
|
// Next the first time its called it will advance as many positions as necessary |
|
// until its able to find a sample within the bounds minT and maxT. |
|
// If there are samples within bounds it will advance one by one amongst them. |
|
// If there are no samples within bounds it will return false. |
|
func (b boundedIterator) Next() chunkenc.ValueType { |
|
for b.Iterator.Next() == chunkenc.ValFloat { |
|
t, _ := b.Iterator.At() |
|
switch { |
|
case t < b.minT: |
|
continue |
|
case t > b.maxT: |
|
return chunkenc.ValNone |
|
default: |
|
return chunkenc.ValFloat |
|
} |
|
} |
|
return chunkenc.ValNone |
|
} |
|
|
|
func (b boundedIterator) Seek(t int64) chunkenc.ValueType { |
|
if t < b.minT { |
|
// We must seek at least up to b.minT if it is asked for something before that. |
|
val := b.Iterator.Seek(b.minT) |
|
if !(val == chunkenc.ValFloat) { |
|
return chunkenc.ValNone |
|
} |
|
t, _ := b.Iterator.At() |
|
if t <= b.maxT { |
|
return chunkenc.ValFloat |
|
} |
|
} |
|
if t > b.maxT { |
|
// We seek anyway so that the subsequent Next() calls will also return false. |
|
b.Iterator.Seek(t) |
|
return chunkenc.ValNone |
|
} |
|
return b.Iterator.Seek(t) |
|
} |
|
|
|
// safeHeadChunk makes sure that the chunk can be accessed without a race condition |
|
type safeHeadChunk struct { |
|
chunkenc.Chunk |
|
s *memSeries |
|
cid chunks.HeadChunkID |
|
isoState *isolationState |
|
} |
|
|
|
func (c *safeHeadChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { |
|
c.s.Lock() |
|
it := c.s.iterator(c.cid, c.Chunk, c.isoState, reuseIter) |
|
c.s.Unlock() |
|
return it |
|
} |
|
|
|
// iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range. |
|
// It is unsafe to call this concurrently with s.append(...) without holding the series lock. |
|
func (s *memSeries) iterator(id chunks.HeadChunkID, c chunkenc.Chunk, isoState *isolationState, it chunkenc.Iterator) chunkenc.Iterator { |
|
ix := int(id) - int(s.firstChunkID) |
|
|
|
numSamples := c.NumSamples() |
|
stopAfter := numSamples |
|
|
|
if isoState != nil && !isoState.IsolationDisabled() { |
|
totalSamples := 0 // Total samples in this series. |
|
previousSamples := 0 // Samples before this chunk. |
|
|
|
for j, d := range s.mmappedChunks { |
|
totalSamples += int(d.numSamples) |
|
if j < ix { |
|
previousSamples += int(d.numSamples) |
|
} |
|
} |
|
|
|
ix -= len(s.mmappedChunks) |
|
if s.headChunks != nil { |
|
// Iterate all head chunks from the oldest to the newest. |
|
headChunksLen := s.headChunks.len() |
|
for j := headChunksLen - 1; j >= 0; j-- { |
|
chk := s.headChunks.atOffset(j) |
|
chkSamples := chk.chunk.NumSamples() |
|
totalSamples += chkSamples |
|
// Chunk ID is len(s.mmappedChunks) + $(headChunks list position). |
|
// Where $(headChunks list position) is zero for the oldest chunk and $(s.headChunks.len() - 1) |
|
// for the newest (open) chunk. |
|
if headChunksLen-1-j < ix { |
|
previousSamples += chkSamples |
|
} |
|
} |
|
} |
|
|
|
// Removing the extra transactionIDs that are relevant for samples that |
|
// come after this chunk, from the total transactionIDs. |
|
appendIDsToConsider := s.txs.txIDCount - (totalSamples - (previousSamples + numSamples)) |
|
|
|
// Iterate over the appendIDs, find the first one that the isolation state says not |
|
// to return. |
|
it := s.txs.iterator() |
|
for index := 0; index < appendIDsToConsider; index++ { |
|
appendID := it.At() |
|
if appendID <= isoState.maxAppendID { // Easy check first. |
|
if _, ok := isoState.incompleteAppends[appendID]; !ok { |
|
it.Next() |
|
continue |
|
} |
|
} |
|
stopAfter = numSamples - (appendIDsToConsider - index) |
|
if stopAfter < 0 { |
|
stopAfter = 0 // Stopped in a previous chunk. |
|
} |
|
break |
|
} |
|
} |
|
|
|
if stopAfter == 0 { |
|
return chunkenc.NewNopIterator() |
|
} |
|
if stopAfter == numSamples { |
|
return c.Iterator(it) |
|
} |
|
return makeStopIterator(c, it, stopAfter) |
|
} |
|
|
|
// stopIterator wraps an Iterator, but only returns the first |
|
// stopAfter values, if initialized with i=-1. |
|
type stopIterator struct { |
|
chunkenc.Iterator |
|
|
|
i, stopAfter int |
|
} |
|
|
|
func (it *stopIterator) Next() chunkenc.ValueType { |
|
if it.i+1 >= it.stopAfter { |
|
return chunkenc.ValNone |
|
} |
|
it.i++ |
|
return it.Iterator.Next() |
|
} |
|
|
|
func makeStopIterator(c chunkenc.Chunk, it chunkenc.Iterator, stopAfter int) chunkenc.Iterator { |
|
// Re-use the Iterator object if it is a stopIterator. |
|
if stopIter, ok := it.(*stopIterator); ok { |
|
stopIter.Iterator = c.Iterator(stopIter.Iterator) |
|
stopIter.i = -1 |
|
stopIter.stopAfter = stopAfter |
|
return stopIter |
|
} |
|
|
|
return &stopIterator{ |
|
Iterator: c.Iterator(it), |
|
i: -1, |
|
stopAfter: stopAfter, |
|
} |
|
}
|
|
|