mirror of https://github.com/prometheus/prometheus
Merge pull request #118 from prometheus/allocsave2
Avoid chunk allocations and refactor compactionspull/5805/head
commit
f9f439c6db
9
block.go
9
block.go
|
@ -22,6 +22,7 @@ import (
|
||||||
|
|
||||||
"github.com/oklog/ulid"
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"github.com/prometheus/tsdb/chunks"
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -112,7 +113,7 @@ type BlockStats struct {
|
||||||
type BlockMetaCompaction struct {
|
type BlockMetaCompaction struct {
|
||||||
// Maximum number of compaction cycles any source block has
|
// Maximum number of compaction cycles any source block has
|
||||||
// gone through.
|
// gone through.
|
||||||
Generation int `json:"generation"`
|
Level int `json:"level"`
|
||||||
// ULIDs of all source head blocks that went into the block.
|
// ULIDs of all source head blocks that went into the block.
|
||||||
Sources []ulid.ULID `json:"sources,omitempty"`
|
Sources []ulid.ULID `json:"sources,omitempty"`
|
||||||
}
|
}
|
||||||
|
@ -181,13 +182,13 @@ type persistedBlock struct {
|
||||||
tombstones tombstoneReader
|
tombstones tombstoneReader
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPersistedBlock(dir string) (*persistedBlock, error) {
|
func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) {
|
||||||
meta, err := readMetaFile(dir)
|
meta, err := readMetaFile(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cr, err := newChunkReader(chunkDir(dir))
|
cr, err := newChunkReader(chunkDir(dir), pool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -252,7 +253,7 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
stones := map[uint32]intervals{}
|
stones := map[uint32]intervals{}
|
||||||
|
|
||||||
var lset labels.Labels
|
var lset labels.Labels
|
||||||
var chks []*ChunkMeta
|
var chks []ChunkMeta
|
||||||
|
|
||||||
Outer:
|
Outer:
|
||||||
for p.Next() {
|
for p.Next() {
|
||||||
|
|
41
chunks.go
41
chunks.go
|
@ -100,7 +100,7 @@ type ChunkWriter interface {
|
||||||
// must be populated.
|
// must be populated.
|
||||||
// After returning successfully, the Ref fields in the ChunkMetas
|
// After returning successfully, the Ref fields in the ChunkMetas
|
||||||
// are set and can be used to retrieve the chunks from the written data.
|
// are set and can be used to retrieve the chunks from the written data.
|
||||||
WriteChunks(chunks ...*ChunkMeta) error
|
WriteChunks(chunks ...ChunkMeta) error
|
||||||
|
|
||||||
// Close writes any required finalization and closes the resources
|
// Close writes any required finalization and closes the resources
|
||||||
// associated with the underlying writer.
|
// associated with the underlying writer.
|
||||||
|
@ -222,7 +222,7 @@ func (w *chunkWriter) write(b []byte) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
|
func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error {
|
||||||
// Calculate maximum space we need and cut a new segment in case
|
// Calculate maximum space we need and cut a new segment in case
|
||||||
// we don't fit into the current one.
|
// we don't fit into the current one.
|
||||||
maxLen := int64(binary.MaxVarintLen32) // The number of chunks.
|
maxLen := int64(binary.MaxVarintLen32) // The number of chunks.
|
||||||
|
@ -238,23 +238,22 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
b := make([]byte, binary.MaxVarintLen32)
|
var (
|
||||||
n := binary.PutUvarint(b, uint64(len(chks)))
|
b = [binary.MaxVarintLen32]byte{}
|
||||||
|
seq = uint64(w.seq()) << 32
|
||||||
|
)
|
||||||
|
for i := range chks {
|
||||||
|
chk := &chks[i]
|
||||||
|
|
||||||
if err := w.write(b[:n]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
seq := uint64(w.seq()) << 32
|
|
||||||
|
|
||||||
for _, chk := range chks {
|
|
||||||
chk.Ref = seq | uint64(w.n)
|
chk.Ref = seq | uint64(w.n)
|
||||||
|
|
||||||
n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes())))
|
n := binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes())))
|
||||||
|
|
||||||
if err := w.write(b[:n]); err != nil {
|
if err := w.write(b[:n]); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := w.write([]byte{byte(chk.Chunk.Encoding())}); err != nil {
|
b[0] = byte(chk.Chunk.Encoding())
|
||||||
|
if err := w.write(b[:1]); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := w.write(chk.Chunk.Bytes()); err != nil {
|
if err := w.write(chk.Chunk.Bytes()); err != nil {
|
||||||
|
@ -265,7 +264,7 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
|
||||||
if err := chk.writeHash(w.crc32); err != nil {
|
if err := chk.writeHash(w.crc32); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := w.write(w.crc32.Sum(nil)); err != nil {
|
if err := w.write(w.crc32.Sum(b[:0])); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -298,15 +297,20 @@ type chunkReader struct {
|
||||||
|
|
||||||
// Closers for resources behind the byte slices.
|
// Closers for resources behind the byte slices.
|
||||||
cs []io.Closer
|
cs []io.Closer
|
||||||
|
|
||||||
|
pool chunks.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
// newChunkReader returns a new chunkReader based on mmaped files found in dir.
|
// newChunkReader returns a new chunkReader based on mmaped files found in dir.
|
||||||
func newChunkReader(dir string) (*chunkReader, error) {
|
func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) {
|
||||||
files, err := sequenceFiles(dir, "")
|
files, err := sequenceFiles(dir, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var cr chunkReader
|
if pool == nil {
|
||||||
|
pool = chunks.NewPool()
|
||||||
|
}
|
||||||
|
cr := chunkReader{pool: pool}
|
||||||
|
|
||||||
for _, fn := range files {
|
for _, fn := range files {
|
||||||
f, err := openMmapFile(fn)
|
f, err := openMmapFile(fn)
|
||||||
|
@ -353,11 +357,6 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
||||||
return nil, fmt.Errorf("reading chunk length failed")
|
return nil, fmt.Errorf("reading chunk length failed")
|
||||||
}
|
}
|
||||||
b = b[n:]
|
b = b[n:]
|
||||||
enc := chunks.Encoding(b[0])
|
|
||||||
|
|
||||||
c, err := chunks.FromData(enc, b[1:1+l])
|
return s.pool.Get(chunks.Encoding(b[0]), b[1:1+l])
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return c, nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,12 @@
|
||||||
|
|
||||||
package chunks
|
package chunks
|
||||||
|
|
||||||
import "fmt"
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
// Encoding is the identifier for a chunk encoding.
|
// Encoding is the identifier for a chunk encoding.
|
||||||
type Encoding uint8
|
type Encoding uint8
|
||||||
|
@ -63,3 +68,53 @@ type Iterator interface {
|
||||||
Err() error
|
Err() error
|
||||||
Next() bool
|
Next() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Pool interface {
|
||||||
|
Put(Chunk) error
|
||||||
|
Get(e Encoding, b []byte) (Chunk, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pool is a memory pool of chunk objects.
|
||||||
|
type pool struct {
|
||||||
|
xor sync.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPool() Pool {
|
||||||
|
return &pool{
|
||||||
|
xor: sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return &XORChunk{b: &bstream{}}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) Get(e Encoding, b []byte) (Chunk, error) {
|
||||||
|
switch e {
|
||||||
|
case EncXOR:
|
||||||
|
c := p.xor.Get().(*XORChunk)
|
||||||
|
c.b.stream = b
|
||||||
|
c.b.count = 0
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
return nil, errors.Errorf("invalid encoding %q", e)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pool) Put(c Chunk) error {
|
||||||
|
switch c.Encoding() {
|
||||||
|
case EncXOR:
|
||||||
|
xc, ok := c.(*XORChunk)
|
||||||
|
// This may happen often with wrapped chunks. Nothing we can really do about
|
||||||
|
// it but returning an error would cause a lot of allocations again. Thus,
|
||||||
|
// we just skip it.
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
xc.b.stream = nil
|
||||||
|
xc.b.count = 0
|
||||||
|
p.xor.Put(c)
|
||||||
|
default:
|
||||||
|
return errors.Errorf("invalid encoding %q", c.Encoding())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
117
compact.go
117
compact.go
|
@ -48,22 +48,22 @@ type Compactor interface {
|
||||||
// Plan returns a set of non-overlapping directories that can
|
// Plan returns a set of non-overlapping directories that can
|
||||||
// be compacted concurrently.
|
// be compacted concurrently.
|
||||||
// Results returned when compactions are in progress are undefined.
|
// Results returned when compactions are in progress are undefined.
|
||||||
Plan() ([][]string, error)
|
Plan(dir string) ([]string, error)
|
||||||
|
|
||||||
// Write persists a Block into a directory.
|
// Write persists a Block into a directory.
|
||||||
Write(b Block) error
|
Write(dest string, b Block) error
|
||||||
|
|
||||||
// Compact runs compaction against the provided directories. Must
|
// Compact runs compaction against the provided directories. Must
|
||||||
// only be called concurrently with results of Plan().
|
// only be called concurrently with results of Plan().
|
||||||
Compact(dirs ...string) error
|
Compact(dest string, dirs ...string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// compactor implements the Compactor interface.
|
// LeveledCompactor implements the Compactor interface.
|
||||||
type compactor struct {
|
type LeveledCompactor struct {
|
||||||
dir string
|
dir string
|
||||||
metrics *compactorMetrics
|
metrics *compactorMetrics
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
opts *compactorOptions
|
opts *LeveledCompactorOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
type compactorMetrics struct {
|
type compactorMetrics struct {
|
||||||
|
@ -98,13 +98,18 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
type compactorOptions struct {
|
type LeveledCompactorOptions struct {
|
||||||
blockRanges []int64
|
blockRanges []int64
|
||||||
|
chunkPool chunks.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
|
func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, opts *LeveledCompactorOptions) *LeveledCompactor {
|
||||||
return &compactor{
|
if opts == nil {
|
||||||
dir: dir,
|
opts = &LeveledCompactorOptions{
|
||||||
|
chunkPool: chunks.NewPool(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &LeveledCompactor{
|
||||||
opts: opts,
|
opts: opts,
|
||||||
logger: l,
|
logger: l,
|
||||||
metrics: newCompactorMetrics(r),
|
metrics: newCompactorMetrics(r),
|
||||||
|
@ -124,8 +129,9 @@ type dirMeta struct {
|
||||||
meta *BlockMeta
|
meta *BlockMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactor) Plan() ([][]string, error) {
|
// Plan returns a list of compactable blocks in the provided directory.
|
||||||
dirs, err := blockDirs(c.dir)
|
func (c *LeveledCompactor) Plan(dir string) ([]string, error) {
|
||||||
|
dirs, err := blockDirs(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -137,7 +143,7 @@ func (c *compactor) Plan() ([][]string, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if meta.Compaction.Generation > 0 {
|
if meta.Compaction.Level > 0 {
|
||||||
dms = append(dms, dirMeta{dir, meta})
|
dms = append(dms, dirMeta{dir, meta})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -149,20 +155,12 @@ func (c *compactor) Plan() ([][]string, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
sliceDirs := func(dms []dirMeta) [][]string {
|
var res []string
|
||||||
if len(dms) == 0 {
|
for _, dm := range c.selectDirs(dms) {
|
||||||
return nil
|
res = append(res, dm.dir)
|
||||||
}
|
|
||||||
var res []string
|
|
||||||
for _, dm := range dms {
|
|
||||||
res = append(res, dm.dir)
|
|
||||||
}
|
|
||||||
return [][]string{res}
|
|
||||||
}
|
}
|
||||||
|
if len(res) > 0 {
|
||||||
planDirs := sliceDirs(c.selectDirs(dms))
|
return res, nil
|
||||||
if len(dirs) > 1 {
|
|
||||||
return planDirs, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compact any blocks that have >5% tombstones.
|
// Compact any blocks that have >5% tombstones.
|
||||||
|
@ -173,7 +171,7 @@ func (c *compactor) Plan() ([][]string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if meta.Stats.NumSeries/meta.Stats.NumTombstones <= 20 { // 5%
|
if meta.Stats.NumSeries/meta.Stats.NumTombstones <= 20 { // 5%
|
||||||
return [][]string{{dms[i].dir}}, nil
|
return []string{dms[i].dir}, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,7 +180,7 @@ func (c *compactor) Plan() ([][]string, error) {
|
||||||
|
|
||||||
// selectDirs returns the dir metas that should be compacted into a single new block.
|
// selectDirs returns the dir metas that should be compacted into a single new block.
|
||||||
// If only a single block range is configured, the result is always nil.
|
// If only a single block range is configured, the result is always nil.
|
||||||
func (c *compactor) selectDirs(ds []dirMeta) []dirMeta {
|
func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
|
||||||
if len(c.opts.blockRanges) < 2 || len(ds) < 1 {
|
if len(c.opts.blockRanges) < 2 || len(ds) < 1 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -261,18 +259,18 @@ func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) {
|
||||||
sources := map[ulid.ULID]struct{}{}
|
sources := map[ulid.ULID]struct{}{}
|
||||||
|
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
if b.Compaction.Generation > res.Compaction.Generation {
|
if b.Compaction.Level > res.Compaction.Level {
|
||||||
res.Compaction.Generation = b.Compaction.Generation
|
res.Compaction.Level = b.Compaction.Level
|
||||||
}
|
}
|
||||||
for _, s := range b.Compaction.Sources {
|
for _, s := range b.Compaction.Sources {
|
||||||
sources[s] = struct{}{}
|
sources[s] = struct{}{}
|
||||||
}
|
}
|
||||||
// If it's an in memory block, its ULID goes into the sources.
|
// If it's an in memory block, its ULID goes into the sources.
|
||||||
if b.Compaction.Generation == 0 {
|
if b.Compaction.Level == 0 {
|
||||||
sources[b.ULID] = struct{}{}
|
sources[b.ULID] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
res.Compaction.Generation++
|
res.Compaction.Level++
|
||||||
|
|
||||||
for s := range sources {
|
for s := range sources {
|
||||||
res.Compaction.Sources = append(res.Compaction.Sources, s)
|
res.Compaction.Sources = append(res.Compaction.Sources, s)
|
||||||
|
@ -284,11 +282,13 @@ func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) {
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactor) Compact(dirs ...string) (err error) {
|
// Compact creates a new block in the compactor's directory from the blocks in the
|
||||||
|
// provided directories.
|
||||||
|
func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) {
|
||||||
var blocks []Block
|
var blocks []Block
|
||||||
|
|
||||||
for _, d := range dirs {
|
for _, d := range dirs {
|
||||||
b, err := newPersistedBlock(d)
|
b, err := newPersistedBlock(d, c.opts.chunkPool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -300,24 +300,24 @@ func (c *compactor) Compact(dirs ...string) (err error) {
|
||||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
uid := ulid.MustNew(ulid.Now(), entropy)
|
uid := ulid.MustNew(ulid.Now(), entropy)
|
||||||
|
|
||||||
return c.write(uid, blocks...)
|
return c.write(dest, uid, blocks...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactor) Write(b Block) error {
|
func (c *LeveledCompactor) Write(dest string, b Block) error {
|
||||||
// Buffering blocks might have been created that often have no data.
|
// Buffering blocks might have been created that often have no data.
|
||||||
if b.Meta().Stats.NumSeries == 0 {
|
if b.Meta().Stats.NumSeries == 0 {
|
||||||
return errors.Wrap(os.RemoveAll(b.Dir()), "remove empty block")
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
uid := ulid.MustNew(ulid.Now(), entropy)
|
uid := ulid.MustNew(ulid.Now(), entropy)
|
||||||
|
|
||||||
return c.write(uid, b)
|
return c.write(dest, uid, b)
|
||||||
}
|
}
|
||||||
|
|
||||||
// write creates a new block that is the union of the provided blocks into dir.
|
// write creates a new block that is the union of the provided blocks into dir.
|
||||||
// It cleans up all files of the old blocks after completing successfully.
|
// It cleans up all files of the old blocks after completing successfully.
|
||||||
func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
func (c *LeveledCompactor) write(dest string, uid ulid.ULID, blocks ...Block) (err error) {
|
||||||
c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks))
|
c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks))
|
||||||
|
|
||||||
defer func(t time.Time) {
|
defer func(t time.Time) {
|
||||||
|
@ -328,7 +328,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
||||||
c.metrics.duration.Observe(time.Since(t).Seconds())
|
c.metrics.duration.Observe(time.Since(t).Seconds())
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
dir := filepath.Join(c.dir, uid.String())
|
dir := filepath.Join(dest, uid.String())
|
||||||
tmp := dir + ".tmp"
|
tmp := dir + ".tmp"
|
||||||
|
|
||||||
if err = os.RemoveAll(tmp); err != nil {
|
if err = os.RemoveAll(tmp); err != nil {
|
||||||
|
@ -350,7 +350,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
||||||
return errors.Wrap(err, "open index writer")
|
return errors.Wrap(err, "open index writer")
|
||||||
}
|
}
|
||||||
|
|
||||||
meta, err := populateBlock(blocks, indexw, chunkw)
|
meta, err := c.populateBlock(blocks, indexw, chunkw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "write compaction")
|
return errors.Wrap(err, "write compaction")
|
||||||
}
|
}
|
||||||
|
@ -376,11 +376,6 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
||||||
if err := renameFile(tmp, dir); err != nil {
|
if err := renameFile(tmp, dir); err != nil {
|
||||||
return errors.Wrap(err, "rename block dir")
|
return errors.Wrap(err, "rename block dir")
|
||||||
}
|
}
|
||||||
for _, b := range blocks {
|
|
||||||
if err := os.RemoveAll(b.Dir()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Properly sync parent dir to ensure changes are visible.
|
// Properly sync parent dir to ensure changes are visible.
|
||||||
df, err := fileutil.OpenDir(dir)
|
df, err := fileutil.OpenDir(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -397,7 +392,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
||||||
|
|
||||||
// populateBlock fills the index and chunk writers with new data gathered as the union
|
// populateBlock fills the index and chunk writers with new data gathered as the union
|
||||||
// of the provided blocks. It returns meta information for the new block.
|
// of the provided blocks. It returns meta information for the new block.
|
||||||
func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
|
func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
|
||||||
var (
|
var (
|
||||||
set compactionSet
|
set compactionSet
|
||||||
metas []BlockMeta
|
metas []BlockMeta
|
||||||
|
@ -474,7 +469,6 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := chunkw.WriteChunks(chks...); err != nil {
|
if err := chunkw.WriteChunks(chks...); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -489,6 +483,10 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
|
||||||
meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
|
meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, chk := range chks {
|
||||||
|
c.opts.chunkPool.Put(chk.Chunk)
|
||||||
|
}
|
||||||
|
|
||||||
for _, l := range lset {
|
for _, l := range lset {
|
||||||
valset, ok := values[l.Name]
|
valset, ok := values[l.Name]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -497,7 +495,9 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
|
||||||
}
|
}
|
||||||
valset.set(l.Value)
|
valset.set(l.Value)
|
||||||
|
|
||||||
postings.add(i, term{name: l.Name, value: l.Value})
|
t := term{name: l.Name, value: l.Value}
|
||||||
|
|
||||||
|
postings.add(i, t)
|
||||||
}
|
}
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
|
@ -536,7 +536,7 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
|
||||||
|
|
||||||
type compactionSet interface {
|
type compactionSet interface {
|
||||||
Next() bool
|
Next() bool
|
||||||
At() (labels.Labels, []*ChunkMeta, intervals)
|
At() (labels.Labels, []ChunkMeta, intervals)
|
||||||
Err() error
|
Err() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -548,7 +548,7 @@ type compactionSeriesSet struct {
|
||||||
series SeriesSet
|
series SeriesSet
|
||||||
|
|
||||||
l labels.Labels
|
l labels.Labels
|
||||||
c []*ChunkMeta
|
c []ChunkMeta
|
||||||
intervals intervals
|
intervals intervals
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
@ -574,7 +574,7 @@ func (c *compactionSeriesSet) Next() bool {
|
||||||
|
|
||||||
// Remove completely deleted chunks.
|
// Remove completely deleted chunks.
|
||||||
if len(c.intervals) > 0 {
|
if len(c.intervals) > 0 {
|
||||||
chks := make([]*ChunkMeta, 0, len(c.c))
|
chks := make([]ChunkMeta, 0, len(c.c))
|
||||||
for _, chk := range c.c {
|
for _, chk := range c.c {
|
||||||
if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) {
|
if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) {
|
||||||
chks = append(chks, chk)
|
chks = append(chks, chk)
|
||||||
|
@ -584,7 +584,9 @@ func (c *compactionSeriesSet) Next() bool {
|
||||||
c.c = chks
|
c.c = chks
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, chk := range c.c {
|
for i := range c.c {
|
||||||
|
chk := &c.c[i]
|
||||||
|
|
||||||
chk.Chunk, c.err = c.chunks.Chunk(chk.Ref)
|
chk.Chunk, c.err = c.chunks.Chunk(chk.Ref)
|
||||||
if c.err != nil {
|
if c.err != nil {
|
||||||
return false
|
return false
|
||||||
|
@ -601,7 +603,7 @@ func (c *compactionSeriesSet) Err() error {
|
||||||
return c.p.Err()
|
return c.p.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactionSeriesSet) At() (labels.Labels, []*ChunkMeta, intervals) {
|
func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, intervals) {
|
||||||
return c.l, c.c, c.intervals
|
return c.l, c.c, c.intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -610,7 +612,7 @@ type compactionMerger struct {
|
||||||
|
|
||||||
aok, bok bool
|
aok, bok bool
|
||||||
l labels.Labels
|
l labels.Labels
|
||||||
c []*ChunkMeta
|
c []ChunkMeta
|
||||||
intervals intervals
|
intervals intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -651,7 +653,7 @@ func (c *compactionMerger) Next() bool {
|
||||||
// While advancing child iterators the memory used for labels and chunks
|
// While advancing child iterators the memory used for labels and chunks
|
||||||
// may be reused. When picking a series we have to store the result.
|
// may be reused. When picking a series we have to store the result.
|
||||||
var lset labels.Labels
|
var lset labels.Labels
|
||||||
var chks []*ChunkMeta
|
var chks []ChunkMeta
|
||||||
|
|
||||||
d := c.compare()
|
d := c.compare()
|
||||||
// Both sets contain the current series. Chain them into a single one.
|
// Both sets contain the current series. Chain them into a single one.
|
||||||
|
@ -681,6 +683,7 @@ func (c *compactionMerger) Next() bool {
|
||||||
c.aok = c.a.Next()
|
c.aok = c.a.Next()
|
||||||
c.bok = c.b.Next()
|
c.bok = c.b.Next()
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -691,7 +694,7 @@ func (c *compactionMerger) Err() error {
|
||||||
return c.b.Err()
|
return c.b.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactionMerger) At() (labels.Labels, []*ChunkMeta, intervals) {
|
func (c *compactionMerger) At() (labels.Labels, []ChunkMeta, intervals) {
|
||||||
return c.l, c.c, c.intervals
|
return c.l, c.c, c.intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,8 +19,8 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestCompactionSelect(t *testing.T) {
|
func TestLeveledCompactor_Select(t *testing.T) {
|
||||||
opts := &compactorOptions{
|
opts := &LeveledCompactorOptions{
|
||||||
blockRanges: []int64{
|
blockRanges: []int64{
|
||||||
20,
|
20,
|
||||||
60,
|
60,
|
||||||
|
@ -173,7 +173,7 @@ func TestCompactionSelect(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
c := &compactor{
|
c := &LeveledCompactor{
|
||||||
opts: opts,
|
opts: opts,
|
||||||
}
|
}
|
||||||
sliceDirs := func(dms []dirMeta) [][]string {
|
sliceDirs := func(dms []dirMeta) [][]string {
|
||||||
|
|
55
db.go
55
db.go
|
@ -37,6 +37,7 @@ import (
|
||||||
"github.com/oklog/ulid"
|
"github.com/oklog/ulid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/tsdb/chunks"
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -95,9 +96,10 @@ type DB struct {
|
||||||
dir string
|
dir string
|
||||||
lockf *lockfile.Lockfile
|
lockf *lockfile.Lockfile
|
||||||
|
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
metrics *dbMetrics
|
metrics *dbMetrics
|
||||||
opts *Options
|
opts *Options
|
||||||
|
chunkPool chunks.Pool
|
||||||
|
|
||||||
// Mutex for that must be held when modifying the general block layout.
|
// Mutex for that must be held when modifying the general block layout.
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
|
@ -203,6 +205,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
donec: make(chan struct{}),
|
donec: make(chan struct{}),
|
||||||
stopc: make(chan struct{}),
|
stopc: make(chan struct{}),
|
||||||
compactionsEnabled: true,
|
compactionsEnabled: true,
|
||||||
|
chunkPool: chunks.NewPool(),
|
||||||
}
|
}
|
||||||
db.metrics = newDBMetrics(db, r)
|
db.metrics = newDBMetrics(db, r)
|
||||||
|
|
||||||
|
@ -221,8 +224,9 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
db.lockf = &lockf
|
db.lockf = &lockf
|
||||||
}
|
}
|
||||||
|
|
||||||
copts := &compactorOptions{
|
copts := &LeveledCompactorOptions{
|
||||||
blockRanges: opts.BlockRanges,
|
blockRanges: opts.BlockRanges,
|
||||||
|
chunkPool: db.chunkPool,
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(copts.blockRanges) == 0 {
|
if len(copts.blockRanges) == 0 {
|
||||||
|
@ -238,7 +242,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1]
|
copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1]
|
||||||
}
|
}
|
||||||
|
|
||||||
db.compactor = newCompactor(dir, r, l, copts)
|
db.compactor = NewLeveledCompactor(r, l, copts)
|
||||||
|
|
||||||
if err := db.reloadBlocks(); err != nil {
|
if err := db.reloadBlocks(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -386,20 +390,24 @@ func (db *DB) compact() (changes bool, err error) {
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = db.compactor.Write(h); err != nil {
|
if err = db.compactor.Write(db.dir, h); err != nil {
|
||||||
return changes, errors.Wrap(err, "persist head block")
|
return changes, errors.Wrap(err, "persist head block")
|
||||||
}
|
}
|
||||||
changes = true
|
changes = true
|
||||||
|
|
||||||
|
if err := os.RemoveAll(h.Dir()); err != nil {
|
||||||
|
return changes, errors.Wrap(err, "delete compacted head block")
|
||||||
|
}
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for compactions of multiple blocks.
|
// Check for compactions of multiple blocks.
|
||||||
for {
|
for {
|
||||||
plans, err := db.compactor.Plan()
|
plan, err := db.compactor.Plan(db.dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return changes, errors.Wrap(err, "plan compaction")
|
return changes, errors.Wrap(err, "plan compaction")
|
||||||
}
|
}
|
||||||
if len(plans) == 0 {
|
if len(plan) == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -409,17 +417,17 @@ func (db *DB) compact() (changes bool, err error) {
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
// We just execute compactions sequentially to not cause too extreme
|
if err := db.compactor.Compact(db.dir, plan...); err != nil {
|
||||||
// CPU and memory spikes.
|
return changes, errors.Wrapf(err, "compact %s", plan)
|
||||||
// TODO(fabxc): return more descriptive plans in the future that allow
|
|
||||||
// estimation of resource usage and conditional parallelization?
|
|
||||||
for _, p := range plans {
|
|
||||||
if err := db.compactor.Compact(p...); err != nil {
|
|
||||||
return changes, errors.Wrapf(err, "compact %s", p)
|
|
||||||
}
|
|
||||||
changes = true
|
|
||||||
runtime.GC()
|
|
||||||
}
|
}
|
||||||
|
changes = true
|
||||||
|
|
||||||
|
for _, pd := range plan {
|
||||||
|
if err := os.RemoveAll(pd); err != nil {
|
||||||
|
return changes, errors.Wrap(err, "delete compacted block")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
runtime.GC()
|
||||||
}
|
}
|
||||||
|
|
||||||
return changes, nil
|
return changes, nil
|
||||||
|
@ -505,10 +513,10 @@ func (db *DB) reloadBlocks() (err error) {
|
||||||
|
|
||||||
b, ok := db.getBlock(meta.ULID)
|
b, ok := db.getBlock(meta.ULID)
|
||||||
if !ok {
|
if !ok {
|
||||||
if meta.Compaction.Generation == 0 {
|
if meta.Compaction.Level == 0 {
|
||||||
b, err = db.openHeadBlock(dir)
|
b, err = db.openHeadBlock(dir)
|
||||||
} else {
|
} else {
|
||||||
b, err = newPersistedBlock(dir)
|
b, err = newPersistedBlock(dir, db.chunkPool)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "open block %s", dir)
|
return errors.Wrapf(err, "open block %s", dir)
|
||||||
|
@ -534,7 +542,7 @@ func (db *DB) reloadBlocks() (err error) {
|
||||||
db.heads = nil
|
db.heads = nil
|
||||||
|
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
if b.Meta().Compaction.Generation == 0 {
|
if b.Meta().Compaction.Level == 0 {
|
||||||
db.heads = append(db.heads, b.(*HeadBlock))
|
db.heads = append(db.heads, b.(*HeadBlock))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -603,6 +611,9 @@ func (db *DB) EnableCompactions() {
|
||||||
|
|
||||||
// Snapshot writes the current data to the directory.
|
// Snapshot writes the current data to the directory.
|
||||||
func (db *DB) Snapshot(dir string) error {
|
func (db *DB) Snapshot(dir string) error {
|
||||||
|
if dir == db.dir {
|
||||||
|
return errors.Errorf("cannot snapshot into base directory")
|
||||||
|
}
|
||||||
db.cmtx.Lock()
|
db.cmtx.Lock()
|
||||||
defer db.cmtx.Unlock()
|
defer db.cmtx.Unlock()
|
||||||
|
|
||||||
|
@ -869,7 +880,7 @@ func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) {
|
||||||
return nil, errors.Wrap(err, "open WAL %s")
|
return nil, errors.Wrap(err, "open WAL %s")
|
||||||
}
|
}
|
||||||
|
|
||||||
h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal)
|
h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal, db.compactor)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "open head block %s", dir)
|
return nil, errors.Wrapf(err, "open head block %s", dir)
|
||||||
}
|
}
|
||||||
|
|
74
head.go
74
head.go
|
@ -52,9 +52,10 @@ var (
|
||||||
|
|
||||||
// HeadBlock handles reads and writes of time series data within a time window.
|
// HeadBlock handles reads and writes of time series data within a time window.
|
||||||
type HeadBlock struct {
|
type HeadBlock struct {
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
dir string
|
dir string
|
||||||
wal WAL
|
wal WAL
|
||||||
|
compactor Compactor
|
||||||
|
|
||||||
activeWriters uint64
|
activeWriters uint64
|
||||||
highTimestamp int64
|
highTimestamp int64
|
||||||
|
@ -106,7 +107,7 @@ func TouchHeadBlock(dir string, mint, maxt int64) (string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenHeadBlock opens the head block in dir.
|
// OpenHeadBlock opens the head block in dir.
|
||||||
func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) {
|
func OpenHeadBlock(dir string, l log.Logger, wal WAL, c Compactor) (*HeadBlock, error) {
|
||||||
meta, err := readMetaFile(dir)
|
meta, err := readMetaFile(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -115,6 +116,7 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) {
|
||||||
h := &HeadBlock{
|
h := &HeadBlock{
|
||||||
dir: dir,
|
dir: dir,
|
||||||
wal: wal,
|
wal: wal,
|
||||||
|
compactor: c,
|
||||||
series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil.
|
series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil.
|
||||||
hashes: map[uint64][]*memSeries{},
|
hashes: map[uint64][]*memSeries{},
|
||||||
values: map[string]stringset{},
|
values: map[string]stringset{},
|
||||||
|
@ -266,68 +268,14 @@ Outer:
|
||||||
}
|
}
|
||||||
|
|
||||||
// Snapshot persists the current state of the headblock to the given directory.
|
// Snapshot persists the current state of the headblock to the given directory.
|
||||||
// TODO(gouthamve): Snapshot must be called when there are no active appenders.
|
// Callers must ensure that there are no active appenders against the block.
|
||||||
// This has been ensured by acquiring a Lock on DB.mtx, but this limitation should
|
// DB does this by acquiring its own write lock.
|
||||||
// be removed in the future.
|
|
||||||
func (h *HeadBlock) Snapshot(snapshotDir string) error {
|
func (h *HeadBlock) Snapshot(snapshotDir string) error {
|
||||||
if h.meta.Stats.NumSeries == 0 {
|
if h.meta.Stats.NumSeries == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
return h.compactor.Write(snapshotDir, h)
|
||||||
uid := ulid.MustNew(ulid.Now(), entropy)
|
|
||||||
|
|
||||||
dir := filepath.Join(snapshotDir, uid.String())
|
|
||||||
tmp := dir + ".tmp"
|
|
||||||
|
|
||||||
if err := os.RemoveAll(tmp); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := os.MkdirAll(tmp, 0777); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Populate chunk and index files into temporary directory with
|
|
||||||
// data of all blocks.
|
|
||||||
chunkw, err := newChunkWriter(chunkDir(tmp))
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "open chunk writer")
|
|
||||||
}
|
|
||||||
indexw, err := newIndexWriter(tmp)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "open index writer")
|
|
||||||
}
|
|
||||||
|
|
||||||
meta, err := populateBlock([]Block{h}, indexw, chunkw)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "write snapshot")
|
|
||||||
}
|
|
||||||
meta.ULID = uid
|
|
||||||
meta.MaxTime = h.highTimestamp
|
|
||||||
|
|
||||||
if err = writeMetaFile(tmp, meta); err != nil {
|
|
||||||
return errors.Wrap(err, "write merged meta")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = chunkw.Close(); err != nil {
|
|
||||||
return errors.Wrap(err, "close chunk writer")
|
|
||||||
}
|
|
||||||
if err = indexw.Close(); err != nil {
|
|
||||||
return errors.Wrap(err, "close index writer")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create an empty tombstones file.
|
|
||||||
if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil {
|
|
||||||
return errors.Wrap(err, "write new tombstones file")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Block successfully written, make visible
|
|
||||||
if err := renameFile(tmp, dir); err != nil {
|
|
||||||
return errors.Wrap(err, "rename block dir")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dir returns the directory of the block.
|
// Dir returns the directory of the block.
|
||||||
|
@ -702,7 +650,7 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Series returns the series for the given reference.
|
// Series returns the series for the given reference.
|
||||||
func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta) error {
|
func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]ChunkMeta) error {
|
||||||
h.mtx.RLock()
|
h.mtx.RLock()
|
||||||
defer h.mtx.RUnlock()
|
defer h.mtx.RUnlock()
|
||||||
|
|
||||||
|
@ -722,7 +670,7 @@ func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*Chunk
|
||||||
*chks = (*chks)[:0]
|
*chks = (*chks)[:0]
|
||||||
|
|
||||||
for i, c := range s.chunks {
|
for i, c := range s.chunks {
|
||||||
*chks = append(*chks, &ChunkMeta{
|
*chks = append(*chks, ChunkMeta{
|
||||||
MinTime: c.minTime,
|
MinTime: c.minTime,
|
||||||
MaxTime: c.maxTime,
|
MaxTime: c.maxTime,
|
||||||
Ref: (uint64(ref) << 32) | uint64(i),
|
Ref: (uint64(ref) << 32) | uint64(i),
|
||||||
|
|
|
@ -43,7 +43,7 @@ func openTestHeadBlock(t testing.TB, dir string) *HeadBlock {
|
||||||
wal, err := OpenSegmentWAL(dir, nil, 5*time.Second)
|
wal, err := OpenSegmentWAL(dir, nil, 5*time.Second)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
h, err := OpenHeadBlock(dir, nil, wal)
|
h, err := OpenHeadBlock(dir, nil, wal, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
return h
|
return h
|
||||||
}
|
}
|
||||||
|
|
15
index.go
15
index.go
|
@ -45,8 +45,8 @@ const compactionPageBytes = minSectorSize * 64
|
||||||
|
|
||||||
type indexWriterSeries struct {
|
type indexWriterSeries struct {
|
||||||
labels labels.Labels
|
labels labels.Labels
|
||||||
chunks []*ChunkMeta // series file offset of chunks
|
chunks []ChunkMeta // series file offset of chunks
|
||||||
offset uint32 // index file offset of series reference
|
offset uint32 // index file offset of series reference
|
||||||
}
|
}
|
||||||
|
|
||||||
type indexWriterSeriesSlice []*indexWriterSeries
|
type indexWriterSeriesSlice []*indexWriterSeries
|
||||||
|
@ -100,7 +100,7 @@ type IndexWriter interface {
|
||||||
// their labels.
|
// their labels.
|
||||||
// The reference numbers are used to resolve entries in postings lists that
|
// The reference numbers are used to resolve entries in postings lists that
|
||||||
// are added later.
|
// are added later.
|
||||||
AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error
|
AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error
|
||||||
|
|
||||||
// WriteLabelIndex serializes an index from label names to values.
|
// WriteLabelIndex serializes an index from label names to values.
|
||||||
// The passed in values chained tuples of strings of the length of names.
|
// The passed in values chained tuples of strings of the length of names.
|
||||||
|
@ -261,7 +261,7 @@ func (w *indexWriter) writeMeta() error {
|
||||||
return w.write(w.buf1.get())
|
return w.write(w.buf1.get())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error {
|
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) error {
|
||||||
if err := w.ensureStage(idxStageSeries); err != nil {
|
if err := w.ensureStage(idxStageSeries); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -471,6 +471,7 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error {
|
||||||
for _, r := range refs {
|
for _, r := range refs {
|
||||||
w.buf2.putBE32(r)
|
w.buf2.putBE32(r)
|
||||||
}
|
}
|
||||||
|
w.uint32s = refs
|
||||||
|
|
||||||
w.buf1.reset()
|
w.buf1.reset()
|
||||||
w.buf1.putBE32int(w.buf2.len())
|
w.buf1.putBE32int(w.buf2.len())
|
||||||
|
@ -524,7 +525,7 @@ type IndexReader interface {
|
||||||
|
|
||||||
// Series populates the given labels and chunk metas for the series identified
|
// Series populates the given labels and chunk metas for the series identified
|
||||||
// by the reference.
|
// by the reference.
|
||||||
Series(ref uint32, lset *labels.Labels, chks *[]*ChunkMeta) error
|
Series(ref uint32, lset *labels.Labels, chks *[]ChunkMeta) error
|
||||||
|
|
||||||
// LabelIndices returns the label pairs for which indices exist.
|
// LabelIndices returns the label pairs for which indices exist.
|
||||||
LabelIndices() ([][]string, error)
|
LabelIndices() ([][]string, error)
|
||||||
|
@ -740,7 +741,7 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta) error {
|
func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]ChunkMeta) error {
|
||||||
d1 := r.decbufAt(int(ref))
|
d1 := r.decbufAt(int(ref))
|
||||||
d2 := d1.decbuf(int(d1.uvarint()))
|
d2 := d1.decbuf(int(d1.uvarint()))
|
||||||
|
|
||||||
|
@ -781,7 +782,7 @@ func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta
|
||||||
return errors.Wrapf(d2.err(), "read meta for chunk %d", i)
|
return errors.Wrapf(d2.err(), "read meta for chunk %d", i)
|
||||||
}
|
}
|
||||||
|
|
||||||
*chks = append(*chks, &ChunkMeta{
|
*chks = append(*chks, ChunkMeta{
|
||||||
Ref: off,
|
Ref: off,
|
||||||
MinTime: mint,
|
MinTime: mint,
|
||||||
MaxTime: maxt,
|
MaxTime: maxt,
|
||||||
|
|
|
@ -29,7 +29,7 @@ import (
|
||||||
|
|
||||||
type series struct {
|
type series struct {
|
||||||
l labels.Labels
|
l labels.Labels
|
||||||
chunks []*ChunkMeta
|
chunks []ChunkMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
type mockIndex struct {
|
type mockIndex struct {
|
||||||
|
@ -52,7 +52,7 @@ func (m mockIndex) Symbols() (map[string]struct{}, error) {
|
||||||
return m.symbols, nil
|
return m.symbols, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error {
|
func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error {
|
||||||
if _, ok := m.series[ref]; ok {
|
if _, ok := m.series[ref]; ok {
|
||||||
return errors.Errorf("series with reference %d already added", ref)
|
return errors.Errorf("series with reference %d already added", ref)
|
||||||
}
|
}
|
||||||
|
@ -64,9 +64,8 @@ func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta)
|
||||||
s := series{l: l}
|
s := series{l: l}
|
||||||
// Actual chunk data is not stored in the index.
|
// Actual chunk data is not stored in the index.
|
||||||
for _, c := range chunks {
|
for _, c := range chunks {
|
||||||
cc := *c
|
c.Chunk = nil
|
||||||
cc.Chunk = nil
|
s.chunks = append(s.chunks, c)
|
||||||
s.chunks = append(s.chunks, &cc)
|
|
||||||
}
|
}
|
||||||
m.series[ref] = s
|
m.series[ref] = s
|
||||||
|
|
||||||
|
@ -126,7 +125,7 @@ func (m mockIndex) SortedPostings(p Postings) Postings {
|
||||||
return newListPostings(ep)
|
return newListPostings(ep)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m mockIndex) Series(ref uint32, lset *labels.Labels, chks *[]*ChunkMeta) error {
|
func (m mockIndex) Series(ref uint32, lset *labels.Labels, chks *[]ChunkMeta) error {
|
||||||
s, ok := m.series[ref]
|
s, ok := m.series[ref]
|
||||||
if !ok {
|
if !ok {
|
||||||
return ErrNotFound
|
return ErrNotFound
|
||||||
|
@ -215,7 +214,7 @@ func TestIndexRW_Postings(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var l labels.Labels
|
var l labels.Labels
|
||||||
var c []*ChunkMeta
|
var c []ChunkMeta
|
||||||
|
|
||||||
for i := 0; p.Next(); i++ {
|
for i := 0; p.Next(); i++ {
|
||||||
err := ir.Series(p.At(), &l, &c)
|
err := ir.Series(p.At(), &l, &c)
|
||||||
|
@ -252,10 +251,10 @@ func TestPersistence_index_e2e(t *testing.T) {
|
||||||
|
|
||||||
// Generate ChunkMetas for every label set.
|
// Generate ChunkMetas for every label set.
|
||||||
for i, lset := range lbls {
|
for i, lset := range lbls {
|
||||||
var metas []*ChunkMeta
|
var metas []ChunkMeta
|
||||||
|
|
||||||
for j := 0; j <= (i % 20); j++ {
|
for j := 0; j <= (i % 20); j++ {
|
||||||
metas = append(metas, &ChunkMeta{
|
metas = append(metas, ChunkMeta{
|
||||||
MinTime: int64(j * 10000),
|
MinTime: int64(j * 10000),
|
||||||
MaxTime: int64((j + 1) * 10000),
|
MaxTime: int64((j + 1) * 10000),
|
||||||
Ref: rand.Uint64(),
|
Ref: rand.Uint64(),
|
||||||
|
@ -333,7 +332,7 @@ func TestPersistence_index_e2e(t *testing.T) {
|
||||||
expp, err := mi.Postings(p.name, p.value)
|
expp, err := mi.Postings(p.name, p.value)
|
||||||
|
|
||||||
var lset, explset labels.Labels
|
var lset, explset labels.Labels
|
||||||
var chks, expchks []*ChunkMeta
|
var chks, expchks []ChunkMeta
|
||||||
|
|
||||||
for gotp.Next() {
|
for gotp.Next() {
|
||||||
require.True(t, expp.Next())
|
require.True(t, expp.Next())
|
||||||
|
|
26
querier.go
26
querier.go
|
@ -403,7 +403,7 @@ func (s *mergedSeriesSet) Next() bool {
|
||||||
|
|
||||||
type chunkSeriesSet interface {
|
type chunkSeriesSet interface {
|
||||||
Next() bool
|
Next() bool
|
||||||
At() (labels.Labels, []*ChunkMeta, intervals)
|
At() (labels.Labels, []ChunkMeta, intervals)
|
||||||
Err() error
|
Err() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -416,12 +416,12 @@ type baseChunkSeries struct {
|
||||||
absent []string // labels that must be unset in results.
|
absent []string // labels that must be unset in results.
|
||||||
|
|
||||||
lset labels.Labels
|
lset labels.Labels
|
||||||
chks []*ChunkMeta
|
chks []ChunkMeta
|
||||||
intervals intervals
|
intervals intervals
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) {
|
func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, intervals) {
|
||||||
return s.lset, s.chks, s.intervals
|
return s.lset, s.chks, s.intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -430,7 +430,7 @@ func (s *baseChunkSeries) Err() error { return s.err }
|
||||||
func (s *baseChunkSeries) Next() bool {
|
func (s *baseChunkSeries) Next() bool {
|
||||||
var (
|
var (
|
||||||
lset labels.Labels
|
lset labels.Labels
|
||||||
chunks []*ChunkMeta
|
chunks []ChunkMeta
|
||||||
)
|
)
|
||||||
Outer:
|
Outer:
|
||||||
for s.p.Next() {
|
for s.p.Next() {
|
||||||
|
@ -453,7 +453,7 @@ Outer:
|
||||||
|
|
||||||
if len(s.intervals) > 0 {
|
if len(s.intervals) > 0 {
|
||||||
// Only those chunks that are not entirely deleted.
|
// Only those chunks that are not entirely deleted.
|
||||||
chks := make([]*ChunkMeta, 0, len(s.chks))
|
chks := make([]ChunkMeta, 0, len(s.chks))
|
||||||
for _, chk := range s.chks {
|
for _, chk := range s.chks {
|
||||||
if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(s.intervals)) {
|
if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(s.intervals)) {
|
||||||
chks = append(chks, chk)
|
chks = append(chks, chk)
|
||||||
|
@ -480,12 +480,12 @@ type populatedChunkSeries struct {
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
|
|
||||||
err error
|
err error
|
||||||
chks []*ChunkMeta
|
chks []ChunkMeta
|
||||||
lset labels.Labels
|
lset labels.Labels
|
||||||
intervals intervals
|
intervals intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *populatedChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) {
|
func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta, intervals) {
|
||||||
return s.lset, s.chks, s.intervals
|
return s.lset, s.chks, s.intervals
|
||||||
}
|
}
|
||||||
func (s *populatedChunkSeries) Err() error { return s.err }
|
func (s *populatedChunkSeries) Err() error { return s.err }
|
||||||
|
@ -501,8 +501,10 @@ func (s *populatedChunkSeries) Next() bool {
|
||||||
chks = chks[1:]
|
chks = chks[1:]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Break out at the first chunk that has no overlap with mint, maxt.
|
for i := range chks {
|
||||||
for i, c := range chks {
|
c := &chks[i]
|
||||||
|
|
||||||
|
// Break out at the first chunk that has no overlap with mint, maxt.
|
||||||
if c.MinTime > s.maxt {
|
if c.MinTime > s.maxt {
|
||||||
chks = chks[:i]
|
chks = chks[:i]
|
||||||
break
|
break
|
||||||
|
@ -564,7 +566,7 @@ func (s *blockSeriesSet) Err() error { return s.err }
|
||||||
// time series data.
|
// time series data.
|
||||||
type chunkSeries struct {
|
type chunkSeries struct {
|
||||||
labels labels.Labels
|
labels labels.Labels
|
||||||
chunks []*ChunkMeta // in-order chunk refs
|
chunks []ChunkMeta // in-order chunk refs
|
||||||
|
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
|
|
||||||
|
@ -667,7 +669,7 @@ func (it *chainedSeriesIterator) Err() error {
|
||||||
// chunkSeriesIterator implements a series iterator on top
|
// chunkSeriesIterator implements a series iterator on top
|
||||||
// of a list of time-sorted, non-overlapping chunks.
|
// of a list of time-sorted, non-overlapping chunks.
|
||||||
type chunkSeriesIterator struct {
|
type chunkSeriesIterator struct {
|
||||||
chunks []*ChunkMeta
|
chunks []ChunkMeta
|
||||||
|
|
||||||
i int
|
i int
|
||||||
cur chunks.Iterator
|
cur chunks.Iterator
|
||||||
|
@ -677,7 +679,7 @@ type chunkSeriesIterator struct {
|
||||||
intervals intervals
|
intervals intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
func newChunkSeriesIterator(cs []*ChunkMeta, dranges intervals, mint, maxt int64) *chunkSeriesIterator {
|
func newChunkSeriesIterator(cs []ChunkMeta, dranges intervals, mint, maxt int64) *chunkSeriesIterator {
|
||||||
it := cs[0].Chunk.Iterator()
|
it := cs[0].Chunk.Iterator()
|
||||||
if len(dranges) > 0 {
|
if len(dranges) > 0 {
|
||||||
it = &deletedIterator{it: it, intervals: dranges}
|
it = &deletedIterator{it: it, intervals: dranges}
|
||||||
|
|
|
@ -235,12 +235,12 @@ func createIdxChkReaders(tc []struct {
|
||||||
|
|
||||||
for i, s := range tc {
|
for i, s := range tc {
|
||||||
i = i + 1 // 0 is not a valid posting.
|
i = i + 1 // 0 is not a valid posting.
|
||||||
metas := make([]*ChunkMeta, 0, len(s.chunks))
|
metas := make([]ChunkMeta, 0, len(s.chunks))
|
||||||
for _, chk := range s.chunks {
|
for _, chk := range s.chunks {
|
||||||
// Collisions can be there, but for tests, its fine.
|
// Collisions can be there, but for tests, its fine.
|
||||||
ref := rand.Uint64()
|
ref := rand.Uint64()
|
||||||
|
|
||||||
metas = append(metas, &ChunkMeta{
|
metas = append(metas, ChunkMeta{
|
||||||
MinTime: chk[0].t,
|
MinTime: chk[0].t,
|
||||||
MaxTime: chk[len(chk)-1].t,
|
MaxTime: chk[len(chk)-1].t,
|
||||||
Ref: ref,
|
Ref: ref,
|
||||||
|
@ -661,7 +661,7 @@ Outer:
|
||||||
func TestBaseChunkSeries(t *testing.T) {
|
func TestBaseChunkSeries(t *testing.T) {
|
||||||
type refdSeries struct {
|
type refdSeries struct {
|
||||||
lset labels.Labels
|
lset labels.Labels
|
||||||
chunks []*ChunkMeta
|
chunks []ChunkMeta
|
||||||
|
|
||||||
ref uint32
|
ref uint32
|
||||||
}
|
}
|
||||||
|
@ -677,7 +677,7 @@ func TestBaseChunkSeries(t *testing.T) {
|
||||||
series: []refdSeries{
|
series: []refdSeries{
|
||||||
{
|
{
|
||||||
lset: labels.New([]labels.Label{{"a", "a"}}...),
|
lset: labels.New([]labels.Label{{"a", "a"}}...),
|
||||||
chunks: []*ChunkMeta{
|
chunks: []ChunkMeta{
|
||||||
{Ref: 29}, {Ref: 45}, {Ref: 245}, {Ref: 123}, {Ref: 4232}, {Ref: 5344},
|
{Ref: 29}, {Ref: 45}, {Ref: 245}, {Ref: 123}, {Ref: 4232}, {Ref: 5344},
|
||||||
{Ref: 121},
|
{Ref: 121},
|
||||||
},
|
},
|
||||||
|
@ -685,19 +685,19 @@ func TestBaseChunkSeries(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...),
|
lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...),
|
||||||
chunks: []*ChunkMeta{
|
chunks: []ChunkMeta{
|
||||||
{Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26},
|
{Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26},
|
||||||
},
|
},
|
||||||
ref: 10,
|
ref: 10,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
lset: labels.New([]labels.Label{{"b", "c"}}...),
|
lset: labels.New([]labels.Label{{"b", "c"}}...),
|
||||||
chunks: []*ChunkMeta{{Ref: 8282}},
|
chunks: []ChunkMeta{{Ref: 8282}},
|
||||||
ref: 1,
|
ref: 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
lset: labels.New([]labels.Label{{"b", "b"}}...),
|
lset: labels.New([]labels.Label{{"b", "b"}}...),
|
||||||
chunks: []*ChunkMeta{
|
chunks: []ChunkMeta{
|
||||||
{Ref: 829}, {Ref: 239}, {Ref: 2349}, {Ref: 659}, {Ref: 269},
|
{Ref: 829}, {Ref: 239}, {Ref: 2349}, {Ref: 659}, {Ref: 269},
|
||||||
},
|
},
|
||||||
ref: 108,
|
ref: 108,
|
||||||
|
@ -711,14 +711,14 @@ func TestBaseChunkSeries(t *testing.T) {
|
||||||
series: []refdSeries{
|
series: []refdSeries{
|
||||||
{
|
{
|
||||||
lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...),
|
lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...),
|
||||||
chunks: []*ChunkMeta{
|
chunks: []ChunkMeta{
|
||||||
{Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26},
|
{Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26},
|
||||||
},
|
},
|
||||||
ref: 10,
|
ref: 10,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
lset: labels.New([]labels.Label{{"b", "c"}}...),
|
lset: labels.New([]labels.Label{{"b", "c"}}...),
|
||||||
chunks: []*ChunkMeta{{Ref: 8282}},
|
chunks: []ChunkMeta{{Ref: 8282}},
|
||||||
ref: 1,
|
ref: 1,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -766,7 +766,7 @@ type itSeries struct {
|
||||||
func (s itSeries) Iterator() SeriesIterator { return s.si }
|
func (s itSeries) Iterator() SeriesIterator { return s.si }
|
||||||
func (s itSeries) Labels() labels.Labels { return labels.Labels{} }
|
func (s itSeries) Labels() labels.Labels { return labels.Labels{} }
|
||||||
|
|
||||||
func chunkFromSamples(s []sample) *ChunkMeta {
|
func chunkFromSamples(s []sample) ChunkMeta {
|
||||||
mint, maxt := int64(0), int64(0)
|
mint, maxt := int64(0), int64(0)
|
||||||
|
|
||||||
if len(s) > 0 {
|
if len(s) > 0 {
|
||||||
|
@ -779,11 +779,10 @@ func chunkFromSamples(s []sample) *ChunkMeta {
|
||||||
for _, s := range s {
|
for _, s := range s {
|
||||||
ca.Append(s.t, s.v)
|
ca.Append(s.t, s.v)
|
||||||
}
|
}
|
||||||
return &ChunkMeta{
|
return ChunkMeta{
|
||||||
MinTime: mint,
|
MinTime: mint,
|
||||||
MaxTime: maxt,
|
MaxTime: maxt,
|
||||||
|
Chunk: c,
|
||||||
Chunk: c,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -945,7 +944,7 @@ func TestSeriesIterator(t *testing.T) {
|
||||||
|
|
||||||
t.Run("Chunk", func(t *testing.T) {
|
t.Run("Chunk", func(t *testing.T) {
|
||||||
for _, tc := range itcases {
|
for _, tc := range itcases {
|
||||||
chkMetas := []*ChunkMeta{
|
chkMetas := []ChunkMeta{
|
||||||
chunkFromSamples(tc.a),
|
chunkFromSamples(tc.a),
|
||||||
chunkFromSamples(tc.b),
|
chunkFromSamples(tc.b),
|
||||||
chunkFromSamples(tc.c),
|
chunkFromSamples(tc.c),
|
||||||
|
@ -1016,7 +1015,7 @@ func TestSeriesIterator(t *testing.T) {
|
||||||
seekcases2 := append(seekcases, extra...)
|
seekcases2 := append(seekcases, extra...)
|
||||||
|
|
||||||
for _, tc := range seekcases2 {
|
for _, tc := range seekcases2 {
|
||||||
chkMetas := []*ChunkMeta{
|
chkMetas := []ChunkMeta{
|
||||||
chunkFromSamples(tc.a),
|
chunkFromSamples(tc.a),
|
||||||
chunkFromSamples(tc.b),
|
chunkFromSamples(tc.b),
|
||||||
chunkFromSamples(tc.c),
|
chunkFromSamples(tc.c),
|
||||||
|
@ -1103,7 +1102,7 @@ func TestSeriesIterator(t *testing.T) {
|
||||||
|
|
||||||
// Regression for: https://github.com/prometheus/tsdb/pull/97
|
// Regression for: https://github.com/prometheus/tsdb/pull/97
|
||||||
func TestChunkSeriesIterator_DoubleSeek(t *testing.T) {
|
func TestChunkSeriesIterator_DoubleSeek(t *testing.T) {
|
||||||
chkMetas := []*ChunkMeta{
|
chkMetas := []ChunkMeta{
|
||||||
chunkFromSamples([]sample{}),
|
chunkFromSamples([]sample{}),
|
||||||
chunkFromSamples([]sample{{1, 1}, {2, 2}, {3, 3}}),
|
chunkFromSamples([]sample{{1, 1}, {2, 2}, {3, 3}}),
|
||||||
chunkFromSamples([]sample{{4, 4}, {5, 5}}),
|
chunkFromSamples([]sample{{4, 4}, {5, 5}}),
|
||||||
|
@ -1120,7 +1119,7 @@ func TestChunkSeriesIterator_DoubleSeek(t *testing.T) {
|
||||||
// Regression when seeked chunks were still found via binary search and we always
|
// Regression when seeked chunks were still found via binary search and we always
|
||||||
// skipped to the end when seeking a value in the current chunk.
|
// skipped to the end when seeking a value in the current chunk.
|
||||||
func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) {
|
func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) {
|
||||||
metas := []*ChunkMeta{
|
metas := []ChunkMeta{
|
||||||
chunkFromSamples([]sample{}),
|
chunkFromSamples([]sample{}),
|
||||||
chunkFromSamples([]sample{{1, 2}, {3, 4}, {5, 6}, {7, 8}}),
|
chunkFromSamples([]sample{{1, 2}, {3, 4}, {5, 6}, {7, 8}}),
|
||||||
chunkFromSamples([]sample{}),
|
chunkFromSamples([]sample{}),
|
||||||
|
@ -1141,7 +1140,7 @@ func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) {
|
||||||
|
|
||||||
func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) {
|
func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) {
|
||||||
lbls := []labels.Labels{labels.New(labels.Label{"a", "b"})}
|
lbls := []labels.Labels{labels.New(labels.Label{"a", "b"})}
|
||||||
chunkMetas := [][]*ChunkMeta{
|
chunkMetas := [][]ChunkMeta{
|
||||||
{
|
{
|
||||||
{MinTime: 1, MaxTime: 2, Ref: 1},
|
{MinTime: 1, MaxTime: 2, Ref: 1},
|
||||||
{MinTime: 3, MaxTime: 4, Ref: 2},
|
{MinTime: 3, MaxTime: 4, Ref: 2},
|
||||||
|
@ -1173,7 +1172,7 @@ func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) {
|
||||||
require.False(t, p.Next())
|
require.False(t, p.Next())
|
||||||
|
|
||||||
// Test the case where 1 chunk could cause an unpopulated chunk to be returned.
|
// Test the case where 1 chunk could cause an unpopulated chunk to be returned.
|
||||||
chunkMetas = [][]*ChunkMeta{
|
chunkMetas = [][]ChunkMeta{
|
||||||
{
|
{
|
||||||
{MinTime: 1, MaxTime: 2, Ref: 1},
|
{MinTime: 1, MaxTime: 2, Ref: 1},
|
||||||
},
|
},
|
||||||
|
@ -1193,7 +1192,7 @@ func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) {
|
||||||
|
|
||||||
type mockChunkSeriesSet struct {
|
type mockChunkSeriesSet struct {
|
||||||
l []labels.Labels
|
l []labels.Labels
|
||||||
cm [][]*ChunkMeta
|
cm [][]ChunkMeta
|
||||||
|
|
||||||
i int
|
i int
|
||||||
}
|
}
|
||||||
|
@ -1206,7 +1205,7 @@ func (m *mockChunkSeriesSet) Next() bool {
|
||||||
return m.i < len(m.l)
|
return m.i < len(m.l)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockChunkSeriesSet) At() (labels.Labels, []*ChunkMeta, intervals) {
|
func (m *mockChunkSeriesSet) At() (labels.Labels, []ChunkMeta, intervals) {
|
||||||
return m.l[m.i], m.cm[m.i], nil
|
return m.l[m.i], m.cm[m.i], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue