|
|
|
@ -28,7 +28,6 @@ import (
|
|
|
|
|
"sort"
|
|
|
|
|
"unsafe"
|
|
|
|
|
|
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
|
"golang.org/x/exp/slices"
|
|
|
|
|
|
|
|
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
|
|
@ -172,7 +171,7 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
|
|
|
|
|
d := encoding.Decbuf{B: b[:len(b)-4]}
|
|
|
|
|
|
|
|
|
|
if d.Crc32(castagnoliTable) != expCRC {
|
|
|
|
|
return nil, errors.Wrap(encoding.ErrInvalidChecksum, "read TOC")
|
|
|
|
|
return nil, fmt.Errorf("read TOC: %w", encoding.ErrInvalidChecksum)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
toc := &TOC{
|
|
|
|
@ -197,7 +196,7 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
|
|
|
|
|
defer df.Close() // Close for platform windows.
|
|
|
|
|
|
|
|
|
|
if err := os.RemoveAll(fn); err != nil {
|
|
|
|
|
return nil, errors.Wrap(err, "remove any existing index at path")
|
|
|
|
|
return nil, fmt.Errorf("remove any existing index at path: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Main index file we are building.
|
|
|
|
@ -216,7 +215,7 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if err := df.Sync(); err != nil {
|
|
|
|
|
return nil, errors.Wrap(err, "sync dir")
|
|
|
|
|
return nil, fmt.Errorf("sync dir: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
iw := &Writer{
|
|
|
|
@ -288,7 +287,7 @@ func (fw *FileWriter) Write(bufs ...[]byte) error {
|
|
|
|
|
// Once we move to compressed/varint representations in those areas, this limitation
|
|
|
|
|
// can be lifted.
|
|
|
|
|
if fw.pos > 16*math.MaxUint32 {
|
|
|
|
|
return errors.Errorf("%q exceeding max size of 64GiB", fw.name)
|
|
|
|
|
return fmt.Errorf("%q exceeding max size of 64GiB", fw.name)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
@ -315,7 +314,7 @@ func (fw *FileWriter) AddPadding(size int) error {
|
|
|
|
|
p = uint64(size) - p
|
|
|
|
|
|
|
|
|
|
if err := fw.Write(make([]byte, p)); err != nil {
|
|
|
|
|
return errors.Wrap(err, "add padding")
|
|
|
|
|
return fmt.Errorf("add padding: %w", err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
@ -353,7 +352,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if w.stage > s {
|
|
|
|
|
return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
|
|
|
|
|
return fmt.Errorf("invalid stage %q, currently at %q", s, w.stage)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Mark start of sections in table of contents.
|
|
|
|
@ -417,20 +416,20 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ...
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if labels.Compare(lset, w.lastSeries) <= 0 {
|
|
|
|
|
return errors.Errorf("out-of-order series added with label set %q", lset)
|
|
|
|
|
return fmt.Errorf("out-of-order series added with label set %q", lset)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if ref < w.lastRef && !w.lastSeries.IsEmpty() {
|
|
|
|
|
return errors.Errorf("series with reference greater than %d already added", ref)
|
|
|
|
|
return fmt.Errorf("series with reference greater than %d already added", ref)
|
|
|
|
|
}
|
|
|
|
|
// We add padding to 16 bytes to increase the addressable space we get through 4 byte
|
|
|
|
|
// series references.
|
|
|
|
|
if err := w.addPadding(16); err != nil {
|
|
|
|
|
return errors.Errorf("failed to write padding bytes: %v", err)
|
|
|
|
|
return fmt.Errorf("failed to write padding bytes: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if w.f.pos%16 != 0 {
|
|
|
|
|
return errors.Errorf("series write not 16-byte aligned at %d", w.f.pos)
|
|
|
|
|
return fmt.Errorf("series write not 16-byte aligned at %d", w.f.pos)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
w.buf2.Reset()
|
|
|
|
@ -443,7 +442,7 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ...
|
|
|
|
|
if !ok {
|
|
|
|
|
nameIndex, err = w.symbols.ReverseLookup(l.Name)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Errorf("symbol entry for %q does not exist, %v", l.Name, err)
|
|
|
|
|
return fmt.Errorf("symbol entry for %q does not exist, %v", l.Name, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
w.labelNames[l.Name]++
|
|
|
|
@ -453,7 +452,7 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ...
|
|
|
|
|
if !ok || cacheEntry.lastValue != l.Value {
|
|
|
|
|
valueIndex, err = w.symbols.ReverseLookup(l.Value)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Errorf("symbol entry for %q does not exist, %v", l.Value, err)
|
|
|
|
|
return fmt.Errorf("symbol entry for %q does not exist, %v", l.Value, err)
|
|
|
|
|
}
|
|
|
|
|
w.symbolCache[l.Name] = symbolCacheEntry{
|
|
|
|
|
index: nameIndex,
|
|
|
|
@ -493,7 +492,7 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ...
|
|
|
|
|
w.buf2.PutHash(w.crc32)
|
|
|
|
|
|
|
|
|
|
if err := w.write(w.buf1.Get(), w.buf2.Get()); err != nil {
|
|
|
|
|
return errors.Wrap(err, "write series data")
|
|
|
|
|
return fmt.Errorf("write series data: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
w.lastSeries.CopyFrom(lset)
|
|
|
|
@ -514,7 +513,7 @@ func (w *Writer) AddSymbol(sym string) error {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if w.numSymbols != 0 && sym <= w.lastSymbol {
|
|
|
|
|
return errors.Errorf("symbol %q out-of-order", sym)
|
|
|
|
|
return fmt.Errorf("symbol %q out-of-order", sym)
|
|
|
|
|
}
|
|
|
|
|
w.lastSymbol = sym
|
|
|
|
|
w.numSymbols++
|
|
|
|
@ -527,7 +526,7 @@ func (w *Writer) finishSymbols() error {
|
|
|
|
|
symbolTableSize := w.f.pos - w.toc.Symbols - 4
|
|
|
|
|
// The symbol table's <len> part is 4 bytes. So the total symbol table size must be less than or equal to 2^32-1
|
|
|
|
|
if symbolTableSize > math.MaxUint32 {
|
|
|
|
|
return errors.Errorf("symbol table size exceeds %d bytes: %d", uint32(math.MaxUint32), symbolTableSize)
|
|
|
|
|
return fmt.Errorf("symbol table size exceeds %d bytes: %d", uint32(math.MaxUint32), symbolTableSize)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Write out the length and symbol count.
|
|
|
|
@ -563,7 +562,7 @@ func (w *Writer) finishSymbols() error {
|
|
|
|
|
// Load in the symbol table efficiently for the rest of the index writing.
|
|
|
|
|
w.symbols, err = NewSymbols(realByteSlice(w.symbolFile.Bytes()), FormatV2, int(w.toc.Symbols))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "read symbols")
|
|
|
|
|
return fmt.Errorf("read symbols: %w", err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
@ -660,7 +659,7 @@ func (w *Writer) writeLabelIndex(name string, values []uint32) error {
|
|
|
|
|
w.buf1.Reset()
|
|
|
|
|
l := w.f.pos - startPos - 4
|
|
|
|
|
if l > math.MaxUint32 {
|
|
|
|
|
return errors.Errorf("label index size exceeds 4 bytes: %d", l)
|
|
|
|
|
return fmt.Errorf("label index size exceeds 4 bytes: %d", l)
|
|
|
|
|
}
|
|
|
|
|
w.buf1.PutBE32int(int(l))
|
|
|
|
|
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
|
|
|
|
@ -704,7 +703,7 @@ func (w *Writer) writeLabelIndexesOffsetTable() error {
|
|
|
|
|
w.buf1.Reset()
|
|
|
|
|
l := w.f.pos - startPos - 4
|
|
|
|
|
if l > math.MaxUint32 {
|
|
|
|
|
return errors.Errorf("label indexes offset table size exceeds 4 bytes: %d", l)
|
|
|
|
|
return fmt.Errorf("label indexes offset table size exceeds 4 bytes: %d", l)
|
|
|
|
|
}
|
|
|
|
|
w.buf1.PutBE32int(int(l))
|
|
|
|
|
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
|
|
|
|
@ -785,7 +784,7 @@ func (w *Writer) writePostingsOffsetTable() error {
|
|
|
|
|
w.buf1.Reset()
|
|
|
|
|
l := w.f.pos - startPos - 4
|
|
|
|
|
if l > math.MaxUint32 {
|
|
|
|
|
return errors.Errorf("postings offset table size exceeds 4 bytes: %d", l)
|
|
|
|
|
return fmt.Errorf("postings offset table size exceeds 4 bytes: %d", l)
|
|
|
|
|
}
|
|
|
|
|
w.buf1.PutBE32int(int(l))
|
|
|
|
|
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
|
|
|
|
@ -839,7 +838,7 @@ func (w *Writer) writePostingsToTmpFiles() error {
|
|
|
|
|
d.ConsumePadding()
|
|
|
|
|
startPos := w.toc.LabelIndices - uint64(d.Len())
|
|
|
|
|
if startPos%16 != 0 {
|
|
|
|
|
return errors.Errorf("series not 16-byte aligned at %d", startPos)
|
|
|
|
|
return fmt.Errorf("series not 16-byte aligned at %d", startPos)
|
|
|
|
|
}
|
|
|
|
|
offsets = append(offsets, uint32(startPos/16))
|
|
|
|
|
// Skip to next series.
|
|
|
|
@ -964,7 +963,7 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
|
|
|
|
|
|
|
|
|
|
for _, off := range offs {
|
|
|
|
|
if off > (1<<32)-1 {
|
|
|
|
|
return errors.Errorf("series offset %d exceeds 4 bytes", off)
|
|
|
|
|
return fmt.Errorf("series offset %d exceeds 4 bytes", off)
|
|
|
|
|
}
|
|
|
|
|
w.buf1.PutBE32(off)
|
|
|
|
|
}
|
|
|
|
@ -973,7 +972,7 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
|
|
|
|
|
l := w.buf1.Len()
|
|
|
|
|
// We convert to uint to make code compile on 32-bit systems, as math.MaxUint32 doesn't fit into int there.
|
|
|
|
|
if uint(l) > math.MaxUint32 {
|
|
|
|
|
return errors.Errorf("posting size exceeds 4 bytes: %d", l)
|
|
|
|
|
return fmt.Errorf("posting size exceeds 4 bytes: %d", l)
|
|
|
|
|
}
|
|
|
|
|
w.buf2.PutBE32int(l)
|
|
|
|
|
w.buf1.PutHash(w.crc32)
|
|
|
|
@ -1000,7 +999,7 @@ func (w *Writer) writePostings() error {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if uint64(n) != w.fP.pos {
|
|
|
|
|
return errors.Errorf("wrote %d bytes to posting temporary file, but only read back %d", w.fP.pos, n)
|
|
|
|
|
return fmt.Errorf("wrote %d bytes to posting temporary file, but only read back %d", w.fP.pos, n)
|
|
|
|
|
}
|
|
|
|
|
w.f.pos += uint64(n)
|
|
|
|
|
|
|
|
|
@ -1135,26 +1134,26 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
|
|
|
|
|
|
|
|
|
|
// Verify header.
|
|
|
|
|
if r.b.Len() < HeaderLen {
|
|
|
|
|
return nil, errors.Wrap(encoding.ErrInvalidSize, "index header")
|
|
|
|
|
return nil, fmt.Errorf("index header: %w", encoding.ErrInvalidSize)
|
|
|
|
|
}
|
|
|
|
|
if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex {
|
|
|
|
|
return nil, errors.Errorf("invalid magic number %x", m)
|
|
|
|
|
return nil, fmt.Errorf("invalid magic number %x", m)
|
|
|
|
|
}
|
|
|
|
|
r.version = int(r.b.Range(4, 5)[0])
|
|
|
|
|
|
|
|
|
|
if r.version != FormatV1 && r.version != FormatV2 {
|
|
|
|
|
return nil, errors.Errorf("unknown index file version %d", r.version)
|
|
|
|
|
return nil, fmt.Errorf("unknown index file version %d", r.version)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var err error
|
|
|
|
|
r.toc, err = NewTOCFromByteSlice(b)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errors.Wrap(err, "read TOC")
|
|
|
|
|
return nil, fmt.Errorf("read TOC: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
r.symbols, err = NewSymbols(r.b, r.version, int(r.toc.Symbols))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errors.Wrap(err, "read symbols")
|
|
|
|
|
return nil, fmt.Errorf("read symbols: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if r.version == FormatV1 {
|
|
|
|
@ -1169,7 +1168,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
|
|
|
|
|
r.postingsV1[string(name)][string(value)] = off
|
|
|
|
|
return nil
|
|
|
|
|
}); err != nil {
|
|
|
|
|
return nil, errors.Wrap(err, "read postings table")
|
|
|
|
|
return nil, fmt.Errorf("read postings table: %w", err)
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
var lastName, lastValue []byte
|
|
|
|
@ -1197,7 +1196,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
|
|
|
|
|
valueCount++
|
|
|
|
|
return nil
|
|
|
|
|
}); err != nil {
|
|
|
|
|
return nil, errors.Wrap(err, "read postings table")
|
|
|
|
|
return nil, fmt.Errorf("read postings table: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if lastName != nil {
|
|
|
|
|
r.postings[string(lastName)] = append(r.postings[string(lastName)], postingOffset{value: string(lastValue), off: lastOff})
|
|
|
|
@ -1217,7 +1216,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
|
|
|
|
|
}
|
|
|
|
|
off, err := r.symbols.ReverseLookup(k)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errors.Wrap(err, "reverse symbol lookup")
|
|
|
|
|
return nil, fmt.Errorf("reverse symbol lookup: %w", err)
|
|
|
|
|
}
|
|
|
|
|
r.nameSymbols[off] = k
|
|
|
|
|
}
|
|
|
|
@ -1252,7 +1251,7 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) {
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}); err != nil {
|
|
|
|
|
return nil, errors.Wrap(err, "read postings table")
|
|
|
|
|
return nil, fmt.Errorf("read postings table: %w", err)
|
|
|
|
|
}
|
|
|
|
|
return m, nil
|
|
|
|
|
}
|
|
|
|
@ -1302,7 +1301,7 @@ func (s Symbols) Lookup(o uint32) (string, error) {
|
|
|
|
|
|
|
|
|
|
if s.version == FormatV2 {
|
|
|
|
|
if int(o) >= s.seen {
|
|
|
|
|
return "", errors.Errorf("unknown symbol offset %d", o)
|
|
|
|
|
return "", fmt.Errorf("unknown symbol offset %d", o)
|
|
|
|
|
}
|
|
|
|
|
d.Skip(s.offsets[int(o/symbolFactor)])
|
|
|
|
|
// Walk until we find the one we want.
|
|
|
|
@ -1321,7 +1320,7 @@ func (s Symbols) Lookup(o uint32) (string, error) {
|
|
|
|
|
|
|
|
|
|
func (s Symbols) ReverseLookup(sym string) (uint32, error) {
|
|
|
|
|
if len(s.offsets) == 0 {
|
|
|
|
|
return 0, errors.Errorf("unknown symbol %q - no symbols", sym)
|
|
|
|
|
return 0, fmt.Errorf("unknown symbol %q - no symbols", sym)
|
|
|
|
|
}
|
|
|
|
|
i := sort.Search(len(s.offsets), func(i int) bool {
|
|
|
|
|
// Any decoding errors here will be lost, however
|
|
|
|
@ -1354,7 +1353,7 @@ func (s Symbols) ReverseLookup(sym string) (uint32, error) {
|
|
|
|
|
return 0, d.Err()
|
|
|
|
|
}
|
|
|
|
|
if lastSymbol != sym {
|
|
|
|
|
return 0, errors.Errorf("unknown symbol %q", sym)
|
|
|
|
|
return 0, fmt.Errorf("unknown symbol %q", sym)
|
|
|
|
|
}
|
|
|
|
|
if s.version == FormatV2 {
|
|
|
|
|
return uint32(res), nil
|
|
|
|
@ -1413,7 +1412,7 @@ func ReadPostingsOffsetTable(bs ByteSlice, off uint64, f func(name, value []byte
|
|
|
|
|
offsetPos := startLen - d.Len()
|
|
|
|
|
|
|
|
|
|
if keyCount := d.Uvarint(); keyCount != 2 {
|
|
|
|
|
return errors.Errorf("unexpected number of keys for postings offset table %d", keyCount)
|
|
|
|
|
return fmt.Errorf("unexpected number of keys for postings offset table %d", keyCount)
|
|
|
|
|
}
|
|
|
|
|
name := d.UvarintBytes()
|
|
|
|
|
value := d.UvarintBytes()
|
|
|
|
@ -1468,7 +1467,7 @@ func (r *Reader) SortedLabelValues(ctx context.Context, name string, matchers ..
|
|
|
|
|
// TODO(replay): Support filtering by matchers.
|
|
|
|
|
func (r *Reader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
|
|
|
|
if len(matchers) > 0 {
|
|
|
|
|
return nil, errors.Errorf("matchers parameter is not implemented: %+v", matchers)
|
|
|
|
|
return nil, fmt.Errorf("matchers parameter is not implemented: %+v", matchers)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if r.version == FormatV1 {
|
|
|
|
@ -1516,7 +1515,7 @@ func (r *Reader) LabelValues(ctx context.Context, name string, matchers ...*labe
|
|
|
|
|
d.Uvarint64() // Offset.
|
|
|
|
|
}
|
|
|
|
|
if d.Err() != nil {
|
|
|
|
|
return nil, errors.Wrap(d.Err(), "get postings offset entry")
|
|
|
|
|
return nil, fmt.Errorf("get postings offset entry: %w", d.Err())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return values, ctx.Err()
|
|
|
|
@ -1542,12 +1541,12 @@ func (r *Reader) LabelNamesFor(ctx context.Context, ids ...storage.SeriesRef) ([
|
|
|
|
|
d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable)
|
|
|
|
|
buf := d.Get()
|
|
|
|
|
if d.Err() != nil {
|
|
|
|
|
return nil, errors.Wrap(d.Err(), "get buffer for series")
|
|
|
|
|
return nil, fmt.Errorf("get buffer for series: %w", d.Err())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
offsets, err := r.dec.LabelNamesOffsetsFor(buf)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errors.Wrap(err, "get label name offsets")
|
|
|
|
|
return nil, fmt.Errorf("get label name offsets: %w", err)
|
|
|
|
|
}
|
|
|
|
|
for _, off := range offsets {
|
|
|
|
|
offsetsMap[off] = struct{}{}
|
|
|
|
@ -1559,7 +1558,7 @@ func (r *Reader) LabelNamesFor(ctx context.Context, ids ...storage.SeriesRef) ([
|
|
|
|
|
for off := range offsetsMap {
|
|
|
|
|
name, err := r.lookupSymbol(ctx, off)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errors.Wrap(err, "lookup symbol in LabelNamesFor")
|
|
|
|
|
return nil, fmt.Errorf("lookup symbol in LabelNamesFor: %w", err)
|
|
|
|
|
}
|
|
|
|
|
names = append(names, name)
|
|
|
|
|
}
|
|
|
|
@ -1580,7 +1579,7 @@ func (r *Reader) LabelValueFor(ctx context.Context, id storage.SeriesRef, label
|
|
|
|
|
d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable)
|
|
|
|
|
buf := d.Get()
|
|
|
|
|
if d.Err() != nil {
|
|
|
|
|
return "", errors.Wrap(d.Err(), "label values for")
|
|
|
|
|
return "", fmt.Errorf("label values for: %w", d.Err())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
value, err := r.dec.LabelValueFor(ctx, buf, label)
|
|
|
|
@ -1607,7 +1606,11 @@ func (r *Reader) Series(id storage.SeriesRef, builder *labels.ScratchBuilder, ch
|
|
|
|
|
if d.Err() != nil {
|
|
|
|
|
return d.Err()
|
|
|
|
|
}
|
|
|
|
|
return errors.Wrap(r.dec.Series(d.Get(), builder, chks), "read series")
|
|
|
|
|
err := r.dec.Series(d.Get(), builder, chks)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("read series: %w", err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *Reader) Postings(ctx context.Context, name string, values ...string) (Postings, error) {
|
|
|
|
@ -1626,7 +1629,7 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
|
|
|
|
|
d := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
|
|
|
|
|
_, p, err := r.dec.Postings(d.Get())
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errors.Wrap(err, "decode postings")
|
|
|
|
|
return nil, fmt.Errorf("decode postings: %w", err)
|
|
|
|
|
}
|
|
|
|
|
res = append(res, p)
|
|
|
|
|
}
|
|
|
|
@ -1688,7 +1691,7 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
|
|
|
|
|
d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
|
|
|
|
|
_, p, err := r.dec.Postings(d2.Get())
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errors.Wrap(err, "decode postings")
|
|
|
|
|
return nil, fmt.Errorf("decode postings: %w", err)
|
|
|
|
|
}
|
|
|
|
|
res = append(res, p)
|
|
|
|
|
}
|
|
|
|
@ -1704,10 +1707,10 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if d.Err() != nil {
|
|
|
|
|
return nil, errors.Wrap(d.Err(), "get postings offset entry")
|
|
|
|
|
return nil, fmt.Errorf("get postings offset entry: %w", d.Err())
|
|
|
|
|
}
|
|
|
|
|
if ctx.Err() != nil {
|
|
|
|
|
return nil, errors.Wrap(ctx.Err(), "get postings offset entry")
|
|
|
|
|
return nil, fmt.Errorf("get postings offset entry: %w", ctx.Err())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1729,7 +1732,7 @@ func (r *Reader) Size() int64 {
|
|
|
|
|
// TODO(twilkie) implement support for matchers.
|
|
|
|
|
func (r *Reader) LabelNames(_ context.Context, matchers ...*labels.Matcher) ([]string, error) {
|
|
|
|
|
if len(matchers) > 0 {
|
|
|
|
|
return nil, errors.Errorf("matchers parameter is not implemented: %+v", matchers)
|
|
|
|
|
return nil, fmt.Errorf("matchers parameter is not implemented: %+v", matchers)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
labelNames := make([]string, 0, len(r.postings))
|
|
|
|
@ -1800,7 +1803,7 @@ func (dec *Decoder) LabelNamesOffsetsFor(b []byte) ([]uint32, error) {
|
|
|
|
|
_ = d.Uvarint() // skip the label value
|
|
|
|
|
|
|
|
|
|
if d.Err() != nil {
|
|
|
|
|
return nil, errors.Wrap(d.Err(), "read series label offsets")
|
|
|
|
|
return nil, fmt.Errorf("read series label offsets: %w", d.Err())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1817,18 +1820,18 @@ func (dec *Decoder) LabelValueFor(ctx context.Context, b []byte, label string) (
|
|
|
|
|
lvo := uint32(d.Uvarint())
|
|
|
|
|
|
|
|
|
|
if d.Err() != nil {
|
|
|
|
|
return "", errors.Wrap(d.Err(), "read series label offsets")
|
|
|
|
|
return "", fmt.Errorf("read series label offsets: %w", d.Err())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ln, err := dec.LookupSymbol(ctx, lno)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", errors.Wrap(err, "lookup label name")
|
|
|
|
|
return "", fmt.Errorf("lookup label name: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if ln == label {
|
|
|
|
|
lv, err := dec.LookupSymbol(ctx, lvo)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", errors.Wrap(err, "lookup label value")
|
|
|
|
|
return "", fmt.Errorf("lookup label value: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return lv, nil
|
|
|
|
@ -1853,16 +1856,16 @@ func (dec *Decoder) Series(b []byte, builder *labels.ScratchBuilder, chks *[]chu
|
|
|
|
|
lvo := uint32(d.Uvarint())
|
|
|
|
|
|
|
|
|
|
if d.Err() != nil {
|
|
|
|
|
return errors.Wrap(d.Err(), "read series label offsets")
|
|
|
|
|
return fmt.Errorf("read series label offsets: %w", d.Err())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ln, err := dec.LookupSymbol(context.TODO(), lno)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "lookup label name")
|
|
|
|
|
return fmt.Errorf("lookup label name: %w", err)
|
|
|
|
|
}
|
|
|
|
|
lv, err := dec.LookupSymbol(context.TODO(), lvo)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "lookup label value")
|
|
|
|
|
return fmt.Errorf("lookup label value: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
builder.Add(ln, lv)
|
|
|
|
@ -1894,7 +1897,7 @@ func (dec *Decoder) Series(b []byte, builder *labels.ScratchBuilder, chks *[]chu
|
|
|
|
|
t0 = maxt
|
|
|
|
|
|
|
|
|
|
if d.Err() != nil {
|
|
|
|
|
return errors.Wrapf(d.Err(), "read meta for chunk %d", i)
|
|
|
|
|
return fmt.Errorf("read meta for chunk %d: %w", i, d.Err())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
*chks = append(*chks, chunks.Meta{
|
|
|
|
|