prometheus/index.go

995 lines
24 KiB
Go
Raw Normal View History

2017-04-10 18:59:45 +00:00
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2016-12-09 19:45:46 +00:00
package tsdb
import (
"bufio"
2016-12-09 20:23:34 +00:00
"encoding/binary"
"fmt"
"hash"
2016-12-09 19:45:46 +00:00
"io"
"math"
"os"
"path/filepath"
2016-12-09 20:23:34 +00:00
"sort"
"strings"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
2016-12-09 19:45:46 +00:00
)
const (
// MagicIndex 4 bytes at the head of an index file.
MagicIndex = 0xBAAAD700
indexFormatV1 = 1
size_unit = 4
)
const indexFilename = "index"
const compactionPageBytes = minSectorSize * 64
2016-12-09 19:45:46 +00:00
2017-04-26 06:33:54 +00:00
type indexWriterSeries struct {
labels labels.Labels
chunks []ChunkMeta // series file offset of chunks
offset uint32 // index file offset of series reference
2017-04-26 06:33:54 +00:00
}
type indexWriterSeriesSlice []*indexWriterSeries
func (s indexWriterSeriesSlice) Len() int { return len(s) }
func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s indexWriterSeriesSlice) Less(i, j int) bool {
return labels.Compare(s[i].labels, s[j].labels) < 0
}
type indexWriterStage uint8
const (
idxStageNone indexWriterStage = iota
idxStageSymbols
idxStageSeries
2017-04-26 06:33:54 +00:00
idxStageLabelIndex
idxStagePostings
idxStageDone
)
func (s indexWriterStage) String() string {
switch s {
case idxStageNone:
return "none"
case idxStageSymbols:
return "symbols"
case idxStageSeries:
return "series"
2017-04-26 06:33:54 +00:00
case idxStageLabelIndex:
return "label index"
case idxStagePostings:
return "postings"
case idxStageDone:
return "done"
}
return "<unknown>"
}
2017-04-28 13:41:42 +00:00
// IndexWriter serializes the index for a block of series data.
// The methods must be called in the order they are specified in.
2016-12-09 19:45:46 +00:00
type IndexWriter interface {
// AddSymbols registers all string symbols that are encountered in series
// and other indices.
AddSymbols(sym map[string]struct{}) error
2017-04-28 13:41:42 +00:00
// AddSeries populates the index writer with a series and its offsets
2016-12-10 08:44:00 +00:00
// of chunks that the index can reference.
// Implementations may require series to be insert in increasing order by
// their labels.
// The reference numbers are used to resolve entries in postings lists that
// are added later.
2017-09-04 14:08:38 +00:00
AddSeries(ref uint64, l labels.Labels, chunks ...ChunkMeta) error
2016-12-09 19:45:46 +00:00
// WriteLabelIndex serializes an index from label names to values.
// The passed in values chained tuples of strings of the length of names.
WriteLabelIndex(names []string, values []string) error
// WritePostings writes a postings list for a single label pair.
// The Postings here contain refs to the series that were added.
2016-12-14 20:58:29 +00:00
WritePostings(name, value string, it Postings) error
2016-12-09 19:45:46 +00:00
// Close writes any finalization and closes the resources associated with
2016-12-09 19:45:46 +00:00
// the underlying writer.
Close() error
}
// indexWriter implements the IndexWriter interface for the standard
// serialization format.
type indexWriter struct {
f *os.File
fbuf *bufio.Writer
pos uint64
toc indexTOC
2017-04-26 06:33:54 +00:00
stage indexWriterStage
2016-12-09 19:45:46 +00:00
2017-03-02 22:35:02 +00:00
// Reusable memory.
buf1 encbuf
buf2 encbuf
2017-03-02 22:35:02 +00:00
uint32s []uint32
symbols map[string]uint32 // symbol offsets
2017-09-04 14:08:38 +00:00
seriesOffsets map[uint64]uint64 // offsets of series
labelIndexes []hashEntry // label index offsets
postings []hashEntry // postings lists offsets
// Hold last series to validate that clients insert new series in order.
lastSeries labels.Labels
crc32 hash.Hash
2016-12-09 20:23:34 +00:00
}
type indexTOC struct {
symbols uint64
series uint64
labelIndices uint64
labelIndicesTable uint64
postings uint64
postingsTable uint64
}
func newIndexWriter(dir string) (*indexWriter, error) {
df, err := fileutil.OpenDir(dir)
if err != nil {
return nil, err
}
defer df.Close() // close for flatform windows
f, err := os.OpenFile(filepath.Join(dir, indexFilename), os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, err
}
if err := fileutil.Fsync(df); err != nil {
return nil, errors.Wrap(err, "sync dir")
}
iw := &indexWriter{
2017-04-26 06:33:54 +00:00
f: f,
fbuf: bufio.NewWriterSize(f, 1<<22),
pos: 0,
stage: idxStageNone,
2017-03-02 22:35:02 +00:00
// Reusable memory.
buf1: encbuf{b: make([]byte, 0, 1<<22)},
buf2: encbuf{b: make([]byte, 0, 1<<22)},
2017-03-02 22:35:02 +00:00
uint32s: make([]uint32, 0, 1<<15),
// Caches.
symbols: make(map[string]uint32, 1<<13),
2017-09-04 14:08:38 +00:00
seriesOffsets: make(map[uint64]uint64, 1<<16),
crc32: newCRC32(),
2016-12-09 20:23:34 +00:00
}
if err := iw.writeMeta(); err != nil {
return nil, err
}
return iw, nil
2016-12-09 20:23:34 +00:00
}
func (w *indexWriter) write(bufs ...[]byte) error {
for _, b := range bufs {
n, err := w.fbuf.Write(b)
w.pos += uint64(n)
if err != nil {
return err
}
// For now the index file must not grow beyond 4GiB. Some of the fixed-sized
// offset references in v1 are only 4 bytes large.
// Once we move to compressed/varint representations in those areas, this limitation
// can be lifted.
if w.pos > math.MaxUint32 {
return errors.Errorf("exceeding max size of 4GiB")
}
}
return nil
2016-12-09 20:23:34 +00:00
}
// addPadding adds zero byte padding until the file size is a multiple size_unit.
func (w *indexWriter) addPadding() error {
p := w.pos % size_unit
if p == 0 {
return nil
}
p = size_unit - p
return errors.Wrap(w.write(make([]byte, p)), "add padding")
}
// ensureStage handles transitions between write stages and ensures that IndexWriter
// methods are called in an order valid for the implementation.
func (w *indexWriter) ensureStage(s indexWriterStage) error {
if w.stage == s {
return nil
}
if w.stage > s {
return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
}
// Mark start of sections in table of contents.
switch s {
case idxStageSymbols:
w.toc.symbols = w.pos
case idxStageSeries:
w.toc.series = w.pos
case idxStageLabelIndex:
w.toc.labelIndices = w.pos
case idxStagePostings:
w.toc.postings = w.pos
case idxStageDone:
w.toc.labelIndicesTable = w.pos
if err := w.writeOffsetTable(w.labelIndexes); err != nil {
return err
}
w.toc.postingsTable = w.pos
if err := w.writeOffsetTable(w.postings); err != nil {
return err
}
if err := w.writeTOC(); err != nil {
return err
}
}
w.stage = s
return nil
}
func (w *indexWriter) writeMeta() error {
w.buf1.reset()
w.buf1.putBE32(MagicIndex)
w.buf1.putByte(indexFormatV1)
2016-12-09 20:23:34 +00:00
return w.write(w.buf1.get())
2016-12-09 19:45:46 +00:00
}
2017-09-04 14:08:38 +00:00
func (w *indexWriter) AddSeries(ref uint64, lset labels.Labels, chunks ...ChunkMeta) error {
if err := w.ensureStage(idxStageSeries); err != nil {
return err
}
if labels.Compare(lset, w.lastSeries) <= 0 {
return errors.Errorf("out-of-order series added with label set %q", lset)
}
if _, ok := w.seriesOffsets[ref]; ok {
return errors.Errorf("series with reference %d already added", ref)
}
w.seriesOffsets[ref] = w.pos
w.buf2.reset()
w.buf2.putUvarint(len(lset))
2016-12-09 20:23:34 +00:00
for _, l := range lset {
offset, ok := w.symbols[l.Name]
if !ok {
return errors.Errorf("symbol entry for %q does not exist", l.Name)
}
w.buf2.putUvarint32(offset)
offset, ok = w.symbols[l.Value]
if !ok {
return errors.Errorf("symbol entry for %q does not exist", l.Value)
}
w.buf2.putUvarint32(offset)
2016-12-09 20:23:34 +00:00
}
2016-12-10 08:44:00 +00:00
w.buf2.putUvarint(len(chunks))
if len(chunks) > 0 {
c := chunks[0]
w.buf2.putVarint64(c.MinTime)
w.buf2.putUvarint64(uint64(c.MaxTime - c.MinTime))
w.buf2.putUvarint64(c.Ref)
t0 := c.MaxTime
ref0 := int64(c.Ref)
for _, c := range chunks[1:] {
w.buf2.putUvarint64(uint64(c.MinTime - t0))
w.buf2.putUvarint64(uint64(c.MaxTime - c.MinTime))
t0 = c.MaxTime
w.buf2.putVarint64(int64(c.Ref) - ref0)
ref0 = int64(c.Ref)
}
}
w.buf1.reset()
w.buf1.putUvarint(w.buf2.len())
w.buf2.putHash(w.crc32)
if err := w.write(w.buf1.get(), w.buf2.get()); err != nil {
return errors.Wrap(err, "write series data")
2016-12-10 08:44:00 +00:00
}
w.lastSeries = append(w.lastSeries[:0], lset...)
return nil
2016-12-09 19:45:46 +00:00
}
func (w *indexWriter) AddSymbols(sym map[string]struct{}) error {
if err := w.ensureStage(idxStageSymbols); err != nil {
return err
}
2016-12-09 20:23:34 +00:00
// Generate sorted list of strings we will store as reference table.
symbols := make([]string, 0, len(sym))
for s := range sym {
2016-12-09 20:23:34 +00:00
symbols = append(symbols, s)
}
sort.Strings(symbols)
const headerSize = 4
w.buf1.reset()
w.buf2.reset()
2016-12-09 20:23:34 +00:00
w.buf2.putBE32int(len(symbols))
w.symbols = make(map[string]uint32, len(symbols))
2016-12-09 20:23:34 +00:00
for _, s := range symbols {
w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len())
w.buf2.putUvarintStr(s)
}
2016-12-09 20:23:34 +00:00
w.buf1.putBE32int(w.buf2.len())
w.buf2.putHash(w.crc32)
2017-04-25 14:45:44 +00:00
err := w.write(w.buf1.get(), w.buf2.get())
return errors.Wrap(err, "write symbols")
2016-12-09 21:27:43 +00:00
}
2016-12-09 19:45:46 +00:00
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
if len(values)%len(names) != 0 {
return errors.Errorf("invalid value list length %d for %d names", len(values), len(names))
}
2017-04-26 06:33:54 +00:00
if err := w.ensureStage(idxStageLabelIndex); err != nil {
return errors.Wrap(err, "ensure stage")
2017-01-19 13:01:38 +00:00
}
valt, err := newStringTuples(values, len(names))
if err != nil {
return err
2016-12-09 20:40:38 +00:00
}
sort.Sort(valt)
2016-12-09 20:40:38 +00:00
// Align beginning to 4 bytes for more efficient index list scans.
if err := w.addPadding(); err != nil {
return err
}
w.labelIndexes = append(w.labelIndexes, hashEntry{
keys: names,
offset: w.pos,
})
w.buf2.reset()
w.buf2.putBE32int(len(names))
w.buf2.putBE32int(valt.Len())
2016-12-09 20:40:38 +00:00
2017-04-25 14:45:44 +00:00
for _, v := range valt.s {
offset, ok := w.symbols[v]
if !ok {
return errors.Errorf("symbol entry for %q does not exist", v)
}
w.buf2.putBE32(offset)
2017-04-25 14:45:44 +00:00
}
w.buf1.reset()
w.buf1.putBE32int(w.buf2.len())
2017-04-25 14:45:44 +00:00
w.buf2.putHash(w.crc32)
err = w.write(w.buf1.get(), w.buf2.get())
return errors.Wrap(err, "write label index")
2016-12-09 19:45:46 +00:00
}
// writeOffsetTable writes a sequence of readable hash entries.
func (w *indexWriter) writeOffsetTable(entries []hashEntry) error {
w.buf2.reset()
w.buf2.putBE32int(len(entries))
for _, e := range entries {
w.buf2.putUvarint(len(e.keys))
for _, k := range e.keys {
w.buf2.putUvarintStr(k)
}
w.buf2.putUvarint64(e.offset)
}
w.buf1.reset()
w.buf1.putBE32int(w.buf2.len())
w.buf2.putHash(w.crc32)
return w.write(w.buf1.get(), w.buf2.get())
}
const indexTOCLen = 6*8 + 4
func (w *indexWriter) writeTOC() error {
w.buf1.reset()
w.buf1.putBE64(w.toc.symbols)
w.buf1.putBE64(w.toc.series)
w.buf1.putBE64(w.toc.labelIndices)
w.buf1.putBE64(w.toc.labelIndicesTable)
w.buf1.putBE64(w.toc.postings)
w.buf1.putBE64(w.toc.postingsTable)
w.buf1.putHash(w.crc32)
return w.write(w.buf1.get())
}
2016-12-14 20:58:29 +00:00
func (w *indexWriter) WritePostings(name, value string, it Postings) error {
2017-04-26 06:33:54 +00:00
if err := w.ensureStage(idxStagePostings); err != nil {
return errors.Wrap(err, "ensure stage")
}
// Align beginning to 4 bytes for more efficient postings list scans.
if err := w.addPadding(); err != nil {
return err
}
2016-12-10 08:44:00 +00:00
w.postings = append(w.postings, hashEntry{
keys: []string{name, value},
offset: w.pos,
2016-12-10 08:44:00 +00:00
})
// Order of the references in the postings list does not imply order
// of the series references within the persisted block they are mapped to.
// We have to sort the new references again.
2017-03-02 22:35:02 +00:00
refs := w.uint32s[:0]
2016-12-10 08:44:00 +00:00
for it.Next() {
offset, ok := w.seriesOffsets[it.At()]
if !ok {
return errors.Errorf("%p series for reference %d not found", w, it.At())
}
2017-09-04 14:08:38 +00:00
if offset > (1<<32)-1 {
return errors.Errorf("series offset %d exceeds 4 bytes", offset)
}
refs = append(refs, uint32(offset))
}
if err := it.Err(); err != nil {
return err
}
2017-03-02 22:35:02 +00:00
sort.Sort(uint32slice(refs))
w.buf2.reset()
w.buf2.putBE32int(len(refs))
for _, r := range refs {
w.buf2.putBE32(r)
2016-12-10 08:44:00 +00:00
}
w.uint32s = refs
2016-12-10 08:44:00 +00:00
w.buf1.reset()
w.buf1.putBE32int(w.buf2.len())
2017-04-25 14:45:44 +00:00
w.buf2.putHash(w.crc32)
2017-04-25 14:45:44 +00:00
err := w.write(w.buf1.get(), w.buf2.get())
return errors.Wrap(err, "write postings")
2016-12-09 19:45:46 +00:00
}
2017-03-02 22:35:02 +00:00
type uint32slice []uint32
func (s uint32slice) Len() int { return len(s) }
func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] }
type hashEntry struct {
keys []string
offset uint64
}
2016-12-09 19:45:46 +00:00
func (w *indexWriter) Close() error {
if err := w.ensureStage(idxStageDone); err != nil {
2017-01-09 17:34:29 +00:00
return err
}
2017-04-25 14:45:44 +00:00
if err := w.fbuf.Flush(); err != nil {
return err
}
if err := fileutil.Fsync(w.f); err != nil {
return err
}
return w.f.Close()
2016-12-09 19:45:46 +00:00
}
// IndexReader provides reading access of serialized index data.
type IndexReader interface {
// Symbols returns a set of string symbols that may occur in series' labels
// and indices.
Symbols() (map[string]struct{}, error)
// LabelValues returns the possible label values
LabelValues(names ...string) (StringTuples, error)
// Postings returns the postings list iterator for the label pair.
// The Postings here contain the offsets to the series inside the index.
// Found IDs are not strictly required to point to a valid Series, e.g. during
// background garbage collections.
Postings(name, value string) (Postings, error)
// SortedPostings returns a postings list that is reordered to be sorted
// by the label set of the underlying series.
SortedPostings(Postings) Postings
// Series populates the given labels and chunk metas for the series identified
// by the reference.
// Returns ErrNotFound if the ref does not resolve to a known series.
2017-09-04 14:08:38 +00:00
Series(ref uint64, lset *labels.Labels, chks *[]ChunkMeta) error
// LabelIndices returns the label pairs for which indices exist.
LabelIndices() ([][]string, error)
// Close released the underlying resources of the reader.
Close() error
}
// StringTuples provides access to a sorted list of string tuples.
type StringTuples interface {
// Total number of tuples in the list.
Len() int
// At returns the tuple at position i.
At(i int) ([]string, error)
}
type indexReader struct {
// The underlying byte slice holding the encoded series data.
b ByteSlice
toc indexTOC
// Close that releases the underlying resources of the byte slice.
c io.Closer
// Cached hashmaps of section offsets.
labels map[string]uint32
postings map[string]uint32
// Cache of read symbols. Strings that are returned when reading from the
// block are always backed by true strings held in here rather than
// strings that are backed by byte slices from the mmap'd index file. This
// prevents memory faults when applications work with read symbols after
// the block has been unmapped.
symbols map[uint32]string
crc32 hash.Hash32
}
var (
2017-10-25 22:12:13 +00:00
errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag")
errInvalidChecksum = fmt.Errorf("invalid checksum")
)
// ByteSlice abstracts a byte slice.
type ByteSlice interface {
Len() int
Range(start, end int) []byte
}
type realByteSlice []byte
func (b realByteSlice) Len() int {
return len(b)
}
func (b realByteSlice) Range(start, end int) []byte {
return b[start:end]
}
func (b realByteSlice) Sub(start, end int) ByteSlice {
return b[start:end]
}
2017-11-10 13:23:14 +00:00
// NewIndexReader returns a new IndexReader on the given byte slice.
func NewIndexReader(b ByteSlice) (IndexReader, error) {
return newIndexReader(b, nil)
}
2017-11-10 13:23:14 +00:00
// NewFileIndexReader returns a new index reader against the given index file.
func NewFileIndexReader(path string) (IndexReader, error) {
f, err := openMmapFile(path)
if err != nil {
return nil, err
}
return newIndexReader(realByteSlice(f.b), f)
}
func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) {
r := &indexReader{
b: b,
c: c,
symbols: map[uint32]string{},
crc32: newCRC32(),
}
// Verify magic number.
if b.Len() < 4 {
return nil, errors.Wrap(errInvalidSize, "index header")
}
if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex {
return nil, errors.Errorf("invalid magic number %x", m)
}
if err := r.readTOC(); err != nil {
return nil, errors.Wrap(err, "read TOC")
}
if err := r.readSymbols(int(r.toc.symbols)); err != nil {
return nil, errors.Wrap(err, "read symbols")
}
var err error
r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable)
if err != nil {
return nil, errors.Wrap(err, "read label index table")
}
r.postings, err = r.readOffsetTable(r.toc.postingsTable)
return r, errors.Wrap(err, "read postings table")
}
func (r *indexReader) readTOC() error {
if r.b.Len() < indexTOCLen {
return errInvalidSize
}
b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len())
expCRC := binary.BigEndian.Uint32(b[len(b)-4:])
d := decbuf{b: b[:len(b)-4]}
if d.crc32() != expCRC {
2017-11-10 13:23:14 +00:00
return errInvalidChecksum
2017-10-25 22:12:13 +00:00
}
r.toc.symbols = d.be64()
r.toc.series = d.be64()
r.toc.labelIndices = d.be64()
r.toc.labelIndicesTable = d.be64()
r.toc.postings = d.be64()
r.toc.postingsTable = d.be64()
return d.err()
}
// decbufAt returns a new decoding buffer. It expects the first 4 bytes
// after offset to hold the big endian encoded content length, followed by the contents and the expected
// checksum.
func (r *indexReader) decbufAt(off int) decbuf {
if r.b.Len() < off+4 {
return decbuf{e: errInvalidSize}
}
b := r.b.Range(off, off+4)
l := int(binary.BigEndian.Uint32(b))
if r.b.Len() < off+4+l+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = r.b.Range(off+4, off+4+l+4)
dec := decbuf{b: b[:len(b)-4]}
if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp {
return decbuf{e: errInvalidChecksum}
}
return dec
}
2017-11-10 13:23:14 +00:00
// decbufUvarintAt returns a new decoding buffer. It expects the first bytes
// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected
// checksum.
func (r *indexReader) decbufUvarintAt(off int) decbuf {
// We never have to access this method at the far end of the byte slice. Thus just checking
// against the MaxVarintLen32 is sufficient.
if r.b.Len() < off+binary.MaxVarintLen32 {
return decbuf{e: errInvalidSize}
}
b := r.b.Range(off, off+binary.MaxVarintLen32)
l, n := binary.Uvarint(b)
if n > binary.MaxVarintLen32 {
return decbuf{e: errors.New("invalid uvarint")}
}
if r.b.Len() < off+n+int(l)+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = r.b.Range(off+n, off+n+int(l)+4)
dec := decbuf{b: b[:len(b)-4]}
if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) {
return decbuf{e: errInvalidChecksum}
}
return dec
}
// readSymbols reads the symbol table fully into memory and allocates proper strings for them.
// Strings backed by the mmap'd memory would cause memory faults if applications keep using them
// after the reader is closed.
func (r *indexReader) readSymbols(off int) error {
if off == 0 {
return nil
}
d := r.decbufAt(off)
var (
origLen = d.len()
cnt = d.be32int()
basePos = uint32(off) + 4
nextPos = basePos + uint32(origLen-d.len())
)
for d.err() == nil && d.len() > 0 && cnt > 0 {
s := d.uvarintStr()
r.symbols[uint32(nextPos)] = s
nextPos = basePos + uint32(origLen-d.len())
cnt--
}
return d.err()
}
// readOffsetTable reads an offset table at the given position and returns a map
// with the key strings concatenated by the 0xff unicode non-character.
func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
const sep = "\xff"
d := r.decbufAt(int(off))
cnt := d.be32()
res := make(map[string]uint32, cnt)
for d.err() == nil && d.len() > 0 && cnt > 0 {
keyCount := int(d.uvarint())
keys := make([]string, 0, keyCount)
for i := 0; i < keyCount; i++ {
keys = append(keys, d.uvarintStr())
}
res[strings.Join(keys, sep)] = uint32(d.uvarint())
cnt--
}
return res, d.err()
}
func (r *indexReader) Close() error {
return r.c.Close()
}
func (r *indexReader) lookupSymbol(o uint32) (string, error) {
s, ok := r.symbols[o]
if !ok {
return "", errors.Errorf("unknown symbol offset %d", o)
}
return s, nil
}
func (r *indexReader) Symbols() (map[string]struct{}, error) {
res := make(map[string]struct{}, len(r.symbols))
for _, s := range r.symbols {
res[s] = struct{}{}
}
return res, nil
}
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
const sep = "\xff"
key := strings.Join(names, sep)
off, ok := r.labels[key]
if !ok {
// XXX(fabxc): hot fix. Should return a partial data error and handle cases
// where the entire block has no data gracefully.
return emptyStringTuples{}, nil
//return nil, fmt.Errorf("label index doesn't exist")
}
d := r.decbufAt(int(off))
nc := d.be32int()
d.be32() // consume unused value entry count.
if d.err() != nil {
return nil, errors.Wrap(d.err(), "read label value index")
}
st := &serializedStringTuples{
l: nc,
b: d.get(),
lookup: r.lookupSymbol,
}
return st, nil
}
type emptyStringTuples struct{}
func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil }
func (emptyStringTuples) Len() int { return 0 }
func (r *indexReader) LabelIndices() ([][]string, error) {
const sep = "\xff"
res := [][]string{}
for s := range r.labels {
res = append(res, strings.Split(s, sep))
}
return res, nil
}
2017-09-04 14:08:38 +00:00
func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error {
d := r.decbufUvarintAt(int(ref))
2017-04-25 14:45:44 +00:00
*lbls = (*lbls)[:0]
*chks = (*chks)[:0]
k := int(d.uvarint())
for i := 0; i < k; i++ {
lno := uint32(d.uvarint())
lvo := uint32(d.uvarint())
if d.err() != nil {
return errors.Wrap(d.err(), "read series label offsets")
}
ln, err := r.lookupSymbol(lno)
if err != nil {
return errors.Wrap(err, "lookup label name")
}
lv, err := r.lookupSymbol(lvo)
if err != nil {
return errors.Wrap(err, "lookup label value")
}
*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
}
// Read the chunks meta data.
k = int(d.uvarint())
if k == 0 {
return nil
}
t0 := d.varint64()
maxt := int64(d.uvarint64()) + t0
ref0 := int64(d.uvarint64())
*chks = append(*chks, ChunkMeta{
Ref: uint64(ref0),
MinTime: t0,
MaxTime: maxt,
})
t0 = maxt
for i := 1; i < k; i++ {
mint := int64(d.uvarint64()) + t0
maxt := int64(d.uvarint64()) + mint
ref0 += d.varint64()
t0 = maxt
if d.err() != nil {
return errors.Wrapf(d.err(), "read meta for chunk %d", i)
}
2017-04-25 14:45:44 +00:00
*chks = append(*chks, ChunkMeta{
Ref: uint64(ref0),
MinTime: mint,
MaxTime: maxt,
})
}
return d.err()
}
func (r *indexReader) Postings(name, value string) (Postings, error) {
const sep = "\xff"
key := strings.Join([]string{name, value}, sep)
off, ok := r.postings[key]
if !ok {
2017-04-05 12:14:30 +00:00
return emptyPostings, nil
}
d := r.decbufAt(int(off))
d.be32() // consume unused postings list length.
return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes")
}
func (r *indexReader) SortedPostings(p Postings) Postings {
return p
}
type stringTuples struct {
l int // tuple length
s []string // flattened tuple entries
}
func newStringTuples(s []string, l int) (*stringTuples, error) {
if len(s)%l != 0 {
return nil, errors.Wrap(errInvalidSize, "string tuple list")
}
return &stringTuples{s: s, l: l}, nil
}
func (t *stringTuples) Len() int { return len(t.s) / t.l }
func (t *stringTuples) At(i int) ([]string, error) { return t.s[i : i+t.l], nil }
func (t *stringTuples) Swap(i, j int) {
c := make([]string, t.l)
copy(c, t.s[i:i+t.l])
for k := 0; k < t.l; k++ {
t.s[i+k] = t.s[j+k]
t.s[j+k] = c[k]
}
}
func (t *stringTuples) Less(i, j int) bool {
for k := 0; k < t.l; k++ {
d := strings.Compare(t.s[i+k], t.s[j+k])
if d < 0 {
return true
}
if d > 0 {
return false
}
}
return false
}
type serializedStringTuples struct {
l int
b []byte
lookup func(uint32) (string, error)
}
func (t *serializedStringTuples) Len() int {
return len(t.b) / (4 * t.l)
}
func (t *serializedStringTuples) At(i int) ([]string, error) {
if len(t.b) < (i+t.l)*4 {
return nil, errInvalidSize
}
res := make([]string, 0, t.l)
for k := 0; k < t.l; k++ {
offset := binary.BigEndian.Uint32(t.b[(i+k)*4:])
s, err := t.lookup(offset)
if err != nil {
return nil, errors.Wrap(err, "symbol lookup")
}
res = append(res, s)
}
return res, nil
}