From a3d042b54e2103d2cf2a4dc31e73c05b6326284f Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sat, 18 Feb 2017 17:33:20 +0100 Subject: [PATCH] Support multiple chunk files in read path --- block.go | 4 ++-- head.go | 13 ++++++++----- querier.go | 2 +- reader.go | 54 +++++++++++++++++++++++++++++++----------------------- writer.go | 4 ++-- 5 files changed, 44 insertions(+), 33 deletions(-) diff --git a/block.go b/block.go index ca7c48f46..35548f62e 100644 --- a/block.go +++ b/block.go @@ -130,11 +130,11 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { return nil, errors.Wrap(err, "open index file") } - sr, err := newSeriesReader(chunksf.b) + sr, err := newSeriesReader([][]byte{chunksf.b}) if err != nil { return nil, errors.Wrap(err, "create series reader") } - ir, err := newIndexReader(sr, indexf.b) + ir, err := newIndexReader(indexf.b) if err != nil { return nil, errors.Wrap(err, "create index reader") } diff --git a/head.go b/head.go index 33bcb7088..d9ab2944c 100644 --- a/head.go +++ b/head.go @@ -364,14 +364,17 @@ type headSeriesReader struct { } // Chunk returns the chunk for the reference number. -func (h *headSeriesReader) Chunk(ref uint32) (chunks.Chunk, error) { +func (h *headSeriesReader) Chunk(ref uint64) (chunks.Chunk, error) { h.mtx.RLock() defer h.mtx.RUnlock() + si := ref >> 32 + ci := (ref << 32) >> 32 + c := &safeChunk{ - Chunk: h.series[ref>>8].chunks[int((ref<<24)>>24)].chunk, - s: h.series[ref>>8], - i: int((ref << 24) >> 24), + Chunk: h.series[si].chunks[ci].chunk, + s: h.series[si], + i: int(ci), } return c, nil } @@ -440,7 +443,7 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) metas = append(metas, ChunkMeta{ MinTime: c.minTime, MaxTime: c.maxTime, - Ref: (ref << 8) | uint32(i), + Ref: (uint64(ref) << 32) | uint64(i), }) } diff --git a/querier.go b/querier.go index df17ad270..c09bab885 100644 --- a/querier.go +++ b/querier.go @@ -491,7 +491,7 @@ type chunkSeries struct { // chunk is a function that retrieves chunks based on a reference // number contained in the chunk meta information. - chunk func(ref uint32) (chunks.Chunk, error) + chunk func(ref uint64) (chunks.Chunk, error) } func (s *chunkSeries) Labels() labels.Labels { diff --git a/reader.go b/reader.go index a49fb794b..630c11a99 100644 --- a/reader.go +++ b/reader.go @@ -13,32 +13,45 @@ import ( // SeriesReader provides reading access of serialized time series data. type SeriesReader interface { // Chunk returns the series data chunk with the given reference. - Chunk(ref uint32) (chunks.Chunk, error) + Chunk(ref uint64) (chunks.Chunk, error) } // seriesReader implements a SeriesReader for a serialized byte stream // of series data. type seriesReader struct { - // The underlying byte slice holding the encoded series data. - b []byte + // The underlying bytes holding the encoded series data. + bs [][]byte } -func newSeriesReader(b []byte) (*seriesReader, error) { - if len(b) < 4 { - return nil, errors.Wrap(errInvalidSize, "index header") +func newSeriesReader(bs [][]byte) (*seriesReader, error) { + s := &seriesReader{bs: bs} + + for i, b := range bs { + if len(b) < 4 { + return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) + } + // Verify magic number. + if m := binary.BigEndian.Uint32(b[:4]); m != MagicSeries { + return nil, fmt.Errorf("invalid magic number %x", m) + } } - // Verify magic number. - if m := binary.BigEndian.Uint32(b[:4]); m != MagicSeries { - return nil, fmt.Errorf("invalid magic number %x", m) - } - return &seriesReader{b: b}, nil + return s, nil } -func (s *seriesReader) Chunk(offset uint32) (chunks.Chunk, error) { - if int(offset) > len(s.b) { - return nil, errors.Errorf("offset %d beyond data size %d", offset, len(s.b)) +func (s *seriesReader) Chunk(ref uint64) (chunks.Chunk, error) { + var ( + seq = int(ref >> 32) + off = int((ref << 32) >> 32) + ) + if seq >= len(s.bs) { + return nil, errors.Errorf("reference sequence %d out of range", seq) } - b := s.b[offset:] + b := s.bs[seq] + + if int(off) >= len(b) { + return nil, errors.Errorf("offset %d beyond data size %d", off, len(b)) + } + b = b[off:] l, n := binary.Uvarint(b) if n < 0 { @@ -78,8 +91,6 @@ type StringTuples interface { } type indexReader struct { - series SeriesReader - // The underlying byte slice holding the encoded series data. b []byte @@ -93,14 +104,11 @@ var ( errInvalidFlag = fmt.Errorf("invalid flag") ) -func newIndexReader(s SeriesReader, b []byte) (*indexReader, error) { +func newIndexReader(b []byte) (*indexReader, error) { if len(b) < 4 { return nil, errors.Wrap(errInvalidSize, "index header") } - r := &indexReader{ - series: s, - b: b, - } + r := &indexReader{b: b} // Verify magic number. if m := binary.BigEndian.Uint32(b[:4]); m != MagicIndex { @@ -299,7 +307,7 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { b = b[n:] chunks = append(chunks, ChunkMeta{ - Ref: uint32(o), + Ref: o, MinTime: firstTime, MaxTime: lastTime, }) diff --git a/writer.go b/writer.go index 86bb75274..2229ff02a 100644 --- a/writer.go +++ b/writer.go @@ -103,7 +103,7 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []ChunkM for i := range chks { chk := &chks[i] - chk.Ref = uint32(w.n) + chk.Ref = uint64(w.n) n = binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes()))) @@ -148,7 +148,7 @@ type ChunkMeta struct { // Ref and Chunk hold either a reference that can be used to retrieve // chunk data or the data itself. // Generally, only one of them is set. - Ref uint32 + Ref uint64 Chunk chunks.Chunk MinTime, MaxTime int64 // time range the data covers