Add main db implementation

pull/5805/head
Fabian Reinartz 8 years ago
parent e79b5b38f4
commit f9f11c4a08

471
db.go

@ -0,0 +1,471 @@
// Package tsdb implements a time series storage for float64 sample data.
package tsdb
import (
"encoding/binary"
"path/filepath"
"sync"
"time"
"github.com/fabxc/tsdb/chunks"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
)
// DefaultOptions used for the DB.
var DefaultOptions = &Options{
StalenessDelta: 5 * time.Minute,
}
// Options of the DB storage.
type Options struct {
StalenessDelta time.Duration
}
// DB is a time series storage.
type DB struct {
logger log.Logger
opts *Options
memChunks *memChunks
persistence *persistence
indexer *indexer
stopc chan struct{}
}
// Open or create a new DB.
func Open(path string, l log.Logger, opts *Options) (*DB, error) {
if opts == nil {
opts = DefaultOptions
}
indexer, err := newMetricIndexer(filepath.Join(path, "index"), defaultIndexerQsize, defaultIndexerTimeout)
if err != nil {
return nil, err
}
persistence, err := newPersistence(filepath.Join(path, "chunks"), defaultIndexerQsize, defaultIndexerTimeout)
if err != nil {
return nil, err
}
mchunks := newMemChunks(l, indexer, persistence, 10, opts.StalenessDelta)
indexer.mc = mchunks
persistence.mc = mchunks
c := &DB{
logger: l,
opts: opts,
memChunks: mchunks,
persistence: persistence,
indexer: indexer,
stopc: make(chan struct{}),
}
go c.memChunks.run(c.stopc)
return c, nil
}
// Close the storage and persist all writes.
func (c *DB) Close() error {
close(c.stopc)
// TODO(fabxc): blocking further writes here necessary?
c.indexer.wait()
c.persistence.wait()
err0 := c.indexer.close()
err1 := c.persistence.close()
if err0 != nil {
return err0
}
return err1
}
// Append ingestes the samples in the scrape into the storage.
func (c *DB) Append(scrape *Scrape) error {
// Sequentially add samples to in-memory chunks.
// TODO(fabxc): evaluate cost of making this atomic.
for _, s := range scrape.m {
if err := c.memChunks.append(s.met, scrape.ts, s.val); err != nil {
// TODO(fabxc): collect in multi error.
return err
}
// TODO(fabxc): increment ingested samples metric.
}
return nil
}
// memChunks holds the chunks that are currently being appended to.
type memChunks struct {
logger log.Logger
stalenessDelta time.Duration
mtx sync.RWMutex
// Chunks by their ID as accessed when retrieving a chunk ID from
// an index query.
chunks map[ChunkID]*chunkDesc
// The highest time slice chunks currently have. A new chunk can not
// be in a higher slice before all chunks with lower IDs have been
// added to the slice.
highTime model.Time
// Power of 2 of chunk shards.
num uint8
// Memory chunks sharded by leading bits of the chunk's metric's
// fingerprints. Used to quickly find chunks for new incoming samples
// where the metric is known but the chunk ID is not.
shards []*memChunksShard
indexer *indexer
persistence *persistence
}
// newMemChunks returns a new memChunks sharded by n locks.
func newMemChunks(l log.Logger, ix *indexer, p *persistence, n uint8, staleness time.Duration) *memChunks {
c := &memChunks{
logger: l,
stalenessDelta: staleness,
num: n,
chunks: map[ChunkID]*chunkDesc{},
persistence: p,
indexer: ix,
}
if n > 63 {
panic("invalid shard power")
}
// Initialize 2^n shards.
for i := 0; i < 1<<n; i++ {
c.shards = append(c.shards, &memChunksShard{
descs: map[model.Fingerprint][]*chunkDesc{},
csize: 1024,
})
}
return c
}
func (mc *memChunks) run(stopc <-chan struct{}) {
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
f := func() error {
for _, cs := range mc.shards {
mc.gc(cs)
}
// Wait for persistence and indexing to finish before reindexing
// memory chunks for the new time slice.
mc.persistence.wait()
mc.indexer.wait()
mc.mtx.Lock()
defer mc.mtx.Unlock()
curTimeSlice := timeSlice(model.Now())
// If the next time slice is in the future, we are done.
if curTimeSlice <= mc.highTime {
return nil
}
ids := make(ChunkIDs, 0, len(mc.chunks))
for id := range mc.chunks {
ids = append(ids, id)
}
if err := mc.indexer.reindexTime(ids, curTimeSlice); err != nil {
return err
}
mc.highTime = curTimeSlice
return nil
}
for {
select {
case <-ticker.C:
if err := f(); err != nil {
mc.logger.With("err", err).Error("memory chunk maintenance failed")
}
case <-stopc:
return
}
}
}
// gc writes stale and incomplete chunks to persistence and removes them
// from the shard.
func (mc *memChunks) gc(cs *memChunksShard) {
cs.RLock()
defer cs.RUnlock()
mint := model.Now().Add(-mc.stalenessDelta)
for fp, cdescs := range cs.descs {
for _, cd := range cdescs {
// If the last sample was added before the staleness delta, consider
// the chunk inactive and persist it.
if cd.lastSample.Timestamp.Before(mint) {
mc.persistence.enqueue(cd)
cs.del(fp, cd)
}
}
}
return
}
func (mc *memChunks) append(m model.Metric, ts model.Time, v model.SampleValue) error {
fp := m.FastFingerprint()
cs := mc.shards[fp>>(64-mc.num)]
cs.Lock()
defer cs.Unlock()
chkd, created := cs.get(fp, m)
if created {
mc.indexer.enqueue(chkd)
}
if err := chkd.append(ts, v); err != chunks.ErrChunkFull {
return err
}
// Chunk was full, remove it so a new head chunk can be created.
// TODO(fabxc): should we just remove them during maintenance if we set a 'persisted'
// flag?
// If we shutdown we work down the persistence queue before exiting, so we should
// lose no data. If we crash, the last snapshot will still have the chunk. Theoretically,
// deleting it here should not be a problem.
cs.del(fp, chkd)
mc.persistence.enqueue(chkd)
// Create a new chunk lazily and continue.
chkd, created = cs.get(fp, m)
if !created {
// Bug if the chunk was not newly created.
panic("expected newly created chunk")
}
mc.indexer.enqueue(chkd)
return chkd.append(ts, v)
}
type memChunksShard struct {
sync.RWMutex
// chunks holds chunk descriptors for one or more chunks
// with a given fingerprint.
descs map[model.Fingerprint][]*chunkDesc
csize int
}
// get returns the chunk descriptor for the given fingerprint/metric combination.
// If none exists, a new chunk descriptor is created and true is returned.
func (cs *memChunksShard) get(fp model.Fingerprint, m model.Metric) (*chunkDesc, bool) {
chks := cs.descs[fp]
for _, cd := range chks {
if cd != nil && cd.met.Equal(m) {
return cd, false
}
}
// None of the given chunks was for the metric, create a new one.
cd := &chunkDesc{
met: m,
chunk: chunks.NewPlainChunk(cs.csize),
}
// Try inserting chunk in existing whole before appending.
for i, c := range chks {
if c == nil {
chks[i] = cd
return cd, true
}
}
cs.descs[fp] = append(chks, cd)
return cd, true
}
// del frees the field of the chunk descriptor for the fingerprint.
func (cs *memChunksShard) del(fp model.Fingerprint, chkd *chunkDesc) {
for i, d := range cs.descs[fp] {
if d == chkd {
cs.descs[fp][i] = nil
return
}
}
}
// ChunkID is a unique identifier for a chunks.
type ChunkID uint64
func (id ChunkID) bytes() []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(id))
return b
}
// ChunkIDs is a sortable list of chunk IDs.
type ChunkIDs []ChunkID
func (c ChunkIDs) Len() int { return len(c) }
func (c ChunkIDs) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c ChunkIDs) Less(i, j int) bool { return c[i] < c[j] }
// chunkDesc wraps a plain data chunk and provides cached meta data about it.
type chunkDesc struct {
id ChunkID
met model.Metric
chunk chunks.Chunk
// Caching fields.
firstTime model.Time
lastSample model.SamplePair
app chunks.Appender // Current appender for the chunks.
}
func (cd *chunkDesc) append(ts model.Time, v model.SampleValue) error {
if cd.app == nil {
cd.app = cd.chunk.Appender()
// TODO(fabxc): set correctly once loading from snapshot is added.
cd.firstTime = ts
}
cd.lastSample.Timestamp = ts
cd.lastSample.Value = v
return cd.app.Append(ts, v)
}
// Scrape gathers samples for a single timestamp.
type Scrape struct {
ts model.Time
m []sample
}
type sample struct {
met model.Metric
val model.SampleValue
}
// Reset resets the scrape data and initializes it for a new scrape at
// the given time. The underlying memory remains allocated for the next scrape.
func (s *Scrape) Reset(ts model.Time) {
s.ts = ts
s.m = s.m[:0]
}
// Dump returns all samples that are part of the scrape.
func (s *Scrape) Dump() []*model.Sample {
d := make([]*model.Sample, 0, len(s.m))
for _, sa := range s.m {
d = append(d, &model.Sample{
Metric: sa.met,
Timestamp: s.ts,
Value: sa.val,
})
}
return d
}
// Add adds a sample value for the given metric to the scrape.
func (s *Scrape) Add(m model.Metric, v model.SampleValue) {
for ln, lv := range m {
if len(lv) == 0 {
delete(m, ln)
}
}
// TODO(fabxc): pre-sort added samples into the correct buckets
// of fingerprint shards so we only have to lock each memChunkShard once.
s.m = append(s.m, sample{met: m, val: v})
}
type chunkBatchProcessor struct {
processf func(...*chunkDesc) error
mtx sync.RWMutex
logger log.Logger
q []*chunkDesc
qcap int
timeout time.Duration
timer *time.Timer
trigger chan struct{}
empty chan struct{}
}
func newChunkBatchProcessor(l log.Logger, cap int, to time.Duration) *chunkBatchProcessor {
if l == nil {
l = log.NewNopLogger()
}
p := &chunkBatchProcessor{
logger: l,
qcap: cap,
timeout: to,
timer: time.NewTimer(to),
trigger: make(chan struct{}, 1),
empty: make(chan struct{}),
}
// Start with closed channel so we don't block on wait if nothing
// has ever been indexed.
close(p.empty)
go p.run()
return p
}
func (p *chunkBatchProcessor) run() {
for {
// Process pending indexing batch if triggered
// or timeout since last indexing has passed.
select {
case <-p.trigger:
case <-p.timer.C:
}
if err := p.process(); err != nil {
p.logger.
With("err", err).With("num", len(p.q)).
Error("batch failed, dropping chunks descs")
}
}
}
func (p *chunkBatchProcessor) process() error {
// TODO(fabxc): locking the entire time will cause lock contention.
p.mtx.Lock()
defer p.mtx.Unlock()
if len(p.q) == 0 {
return nil
}
// Leave chunk descs behind whether successful or not.
defer func() {
p.q = p.q[:0]
close(p.empty)
}()
return p.processf(p.q...)
}
func (p *chunkBatchProcessor) enqueue(cds ...*chunkDesc) {
p.mtx.Lock()
defer p.mtx.Unlock()
if len(p.q) == 0 {
p.timer.Reset(p.timeout)
p.empty = make(chan struct{})
}
p.q = append(p.q, cds...)
if len(p.q) > p.qcap {
select {
case p.trigger <- struct{}{}:
default:
// If we cannot send a signal is already set.
}
}
}
// wait blocks until the queue becomes empty.
func (p *chunkBatchProcessor) wait() {
p.mtx.RLock()
c := p.empty
p.mtx.RUnlock()
<-c
}

@ -0,0 +1,145 @@
package tsdb
import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"testing"
"time"
"github.com/fabxc/tindex"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/cinamon/chunk"
"github.com/prometheus/prometheus/storage/metric"
"github.com/stretchr/testify/require"
)
func TestE2E(t *testing.T) {
dir, err := ioutil.TempDir("", "cinamon_test")
require.NoError(t, err)
defer os.RemoveAll(dir)
c, err := Open(dir, log.Base(), nil)
require.NoError(t, err)
c.memChunks.indexer.timeout = 50 * time.Millisecond
// Set indexer size to be triggered exactly when we hit the limit.
// c.memChunks.indexer.qmax = 10
mets := generateMetrics(100000)
// var wg sync.WaitGroup
// for k := 0; k < len(mets)/100+1; k++ {
// wg.Add(1)
// go func(mets []model.Metric) {
var s Scrape
for i := 0; i < 2*64; i++ {
s.Reset(model.Time(i) * 100000)
for _, m := range mets {
s.Add(m, model.SampleValue(rand.Float64()))
}
require.NoError(t, c.Append(&s))
}
// wg.Done()
// }(mets[k*100 : (k+1)*100])
// }
// wg.Wait()
start := time.Now()
c.memChunks.indexer.wait()
fmt.Println("index wait", time.Since(start))
start = time.Now()
q, err := c.Querier()
require.NoError(t, err)
defer q.Close()
m1, err := metric.NewLabelMatcher(metric.Equal, "job", "somejob")
require.NoError(t, err)
m2, err := metric.NewLabelMatcher(metric.Equal, "label2", "value0")
require.NoError(t, err)
m3, err := metric.NewLabelMatcher(metric.Equal, "label4", "value0")
require.NoError(t, err)
it, err := q.Iterator(m1, m2, m3)
require.NoError(t, err)
res, err := tindex.ExpandIterator(it)
require.NoError(t, err)
fmt.Println("result len", len(res))
fmt.Println("querying", time.Since(start))
}
func generateMetrics(n int) (res []model.Metric) {
for i := 0; i < n; i++ {
res = append(res, model.Metric{
"job": "somejob",
"label5": model.LabelValue(fmt.Sprintf("value%d", i%10)),
"label4": model.LabelValue(fmt.Sprintf("value%d", i%5)),
"label3": model.LabelValue(fmt.Sprintf("value%d", i%3)),
"label2": model.LabelValue(fmt.Sprintf("value%d", i%2)),
"label1": model.LabelValue(fmt.Sprintf("value%d", i)),
})
}
return res
}
func TestMemChunksShardGet(t *testing.T) {
cs := &memChunksShard{
descs: map[model.Fingerprint][]*chunkDesc{},
csize: 100,
}
cdesc1, created1 := cs.get(123, model.Metric{"x": "1"})
require.True(t, created1)
require.Equal(t, 1, len(cs.descs[123]))
require.Equal(t, &chunkDesc{
met: model.Metric{"x": "1"},
chunk: chunk.NewPlainChunk(100),
}, cdesc1)
// Add colliding metric.
cdesc2, created2 := cs.get(123, model.Metric{"x": "2"})
require.True(t, created2)
require.Equal(t, 2, len(cs.descs[123]))
require.Equal(t, &chunkDesc{
met: model.Metric{"x": "2"},
chunk: chunk.NewPlainChunk(100),
}, cdesc2)
// First chunk desc can still be retrieved correctly.
cdesc1, created1 = cs.get(123, model.Metric{"x": "1"})
require.False(t, created1)
require.Equal(t, &chunkDesc{
met: model.Metric{"x": "1"},
chunk: chunk.NewPlainChunk(100),
}, cdesc1)
}
func TestChunkSeriesIterator(t *testing.T) {
newChunk := func(s []model.SamplePair) chunk.Chunk {
c := chunk.NewPlainChunk(1000)
app := c.Appender()
for _, sp := range s {
if err := app.Append(sp.Timestamp, sp.Value); err != nil {
t.Fatal(err)
}
}
return c
}
it := newChunkSeriesIterator(metric.Metric{}, []chunk.Chunk{
newChunk([]model.SamplePair{{1, 1}, {2, 2}, {3, 3}}),
newChunk([]model.SamplePair{{4, 4}, {5, 5}, {6, 6}}),
newChunk([]model.SamplePair{{7, 7}, {8, 8}, {9, 9}}),
})
var res []model.SamplePair
for sp, ok := it.Seek(0); ok; sp, ok = it.Next() {
fmt.Println(sp)
res = append(res, sp)
}
require.Equal(t, io.EOF, it.Err())
require.Equal(t, []model.SamplePair{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}, {7, 7}, {8, 8}, {9, 9}}, res)
}

@ -0,0 +1,130 @@
package tsdb
import (
"sort"
"strconv"
"sync/atomic"
"time"
"github.com/fabxc/tsdb/index"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
)
const (
defaultIndexerTimeout = 1 * time.Second
defaultIndexerQsize = 500000
)
// indexer asynchronously indexes chunks in batches. It indexes all labels
// of a chunk with a forward mapping and additionally indexes the chunk for
// the time slice of its first sample.
type indexer struct {
*chunkBatchProcessor
ix *index.Index
mc *memChunks
}
// Create batch indexer that creates new index documents
// and indexes them by the metric fields.
// Its post-indexing hook populates the in-memory chunk forward index.
func newMetricIndexer(path string, qsz int, qto time.Duration) (*indexer, error) {
ix, err := index.Open(path, nil)
if err != nil {
return nil, err
}
i := &indexer{
ix: ix,
chunkBatchProcessor: newChunkBatchProcessor(log.Base(), qsz, qto),
}
i.chunkBatchProcessor.processf = i.index
return i, nil
}
func (ix *indexer) Querier() (*index.Querier, error) {
return ix.ix.Querier()
}
const (
timeSliceField = "__ts__"
timeSliceSize = 3 * time.Hour
)
func timeSlice(t model.Time) model.Time {
return t - (t % model.Time(timeSliceSize/time.Millisecond))
}
func timeString(t model.Time) string {
return strconv.FormatInt(int64(t), 16)
}
func (ix *indexer) close() error {
return ix.ix.Close()
}
func (ix *indexer) index(cds ...*chunkDesc) error {
b, err := ix.ix.Batch()
if err != nil {
return err
}
ids := make([]ChunkID, len(cds))
for i, cd := range cds {
terms := make(index.Terms, 0, len(cd.met))
for k, v := range cd.met {
t := index.Term{Field: string(k), Val: string(v)}
terms = append(terms, t)
}
id := b.Add(terms)
ts := timeSlice(cd.firstTime)
// If the chunk has a higher time slice than the high one,
// don't index. It will be indexed when the next time slice
// is initiated over all memory chunks.
if ts <= ix.mc.highTime {
b.SecondaryIndex(id, index.Term{
Field: timeSliceField,
Val: timeString(ts),
})
}
ids[i] = ChunkID(id)
}
if err := b.Commit(); err != nil {
return err
}
// We have to lock here already instead of post-commit as otherwise we might
// generate new chunk IDs, skip their indexing, and have a reindexTime being
// called with the chunk ID not being visible yet.
// TODO(fabxc): move back up
ix.mc.mtx.Lock()
defer ix.mc.mtx.Unlock()
// Make in-memory chunks visible for read.
for i, cd := range cds {
atomic.StoreUint64((*uint64)(&cd.id), uint64(ids[i]))
ix.mc.chunks[cd.id] = cd
}
return nil
}
// reindexTime creates an initial time slice index over all chunk IDs.
// Any future chunks indexed for the same time slice must have higher IDs.
func (ix *indexer) reindexTime(ids ChunkIDs, ts model.Time) error {
b, err := ix.ix.Batch()
if err != nil {
return err
}
sort.Sort(ids)
t := index.Term{Field: timeSliceField, Val: timeString(ts)}
for _, id := range ids {
b.SecondaryIndex(index.DocID(id), t)
}
return b.Commit()
}

@ -0,0 +1,163 @@
package tsdb
import (
"encoding/binary"
"os"
"path/filepath"
"time"
"github.com/boltdb/bolt"
"github.com/fabxc/pagebuf"
"github.com/prometheus/common/log"
)
type persistence struct {
*chunkBatchProcessor
mc *memChunks
chunks *pagebuf.DB
index *bolt.DB
}
func newPersistence(path string, cap int, to time.Duration) (*persistence, error) {
if err := os.MkdirAll(path, 0777); err != nil {
return nil, err
}
ix, err := bolt.Open(filepath.Join(path, "ix"), 0666, nil)
if err != nil {
return nil, err
}
if err := ix.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bktChunks)
return err
}); err != nil {
return nil, err
}
pb, err := pagebuf.Open(filepath.Join(path, "chunks"), 0666, nil)
if err != nil {
return nil, err
}
p := &persistence{
chunks: pb,
index: ix,
chunkBatchProcessor: newChunkBatchProcessor(log.Base(), cap, to),
}
p.chunkBatchProcessor.processf = p.persist
return p, nil
}
var bktChunks = []byte("chunks")
func (p *persistence) close() error {
// Index must be closed first, otherwise we might deadlock.
err0 := p.index.Close()
err1 := p.chunks.Close()
if err0 != nil {
return err0
}
return err1
}
func (p *persistence) persist(cds ...*chunkDesc) error {
err := p.update(func(tx *persistenceTx) error {
bkt := tx.ix.Bucket(bktChunks)
for _, cd := range cds {
pos, err := tx.chunks.Add(cd.chunk.Data())
if err != nil {
return err
}
var buf [16]byte
binary.BigEndian.PutUint64(buf[:8], uint64(cd.id))
binary.BigEndian.PutUint64(buf[8:], pos)
if err := bkt.Put(buf[:8], buf[8:]); err != nil {
return err
}
tx.ids = append(tx.ids, cd.id)
}
return nil
})
return err
}
func (p *persistence) update(f func(*persistenceTx) error) error {
tx, err := p.begin(true)
if err != nil {
return err
}
if err := f(tx); err != nil {
tx.rollback()
return err
}
return tx.commit()
}
func (p *persistence) view(f func(*persistenceTx) error) error {
tx, err := p.begin(false)
if err != nil {
return err
}
if err := f(tx); err != nil {
tx.rollback()
return err
}
return tx.rollback()
}
func (p *persistence) begin(writeable bool) (*persistenceTx, error) {
var err error
tx := &persistenceTx{p: p}
// Index transaction is the outer one so we might end up with orphaned
// chunks but never with dangling pointers in the index.
tx.ix, err = p.index.Begin(writeable)
if err != nil {
return nil, err
}
tx.chunks, err = p.chunks.Begin(writeable)
if err != nil {
tx.ix.Rollback()
return nil, err
}
return tx, nil
}
type persistenceTx struct {
p *persistence
ix *bolt.Tx
chunks *pagebuf.Tx
ids []ChunkID
}
func (tx *persistenceTx) commit() error {
if err := tx.chunks.Commit(); err != nil {
tx.ix.Rollback()
return err
}
if err := tx.ix.Commit(); err != nil {
// TODO(fabxc): log orphaned chunks. What about overwritten ones?
// Should we not allows delete and add in the same tx so this cannot happen?
return err
}
// Successfully persisted chunks, clear them from the in-memory
// forward mapping.
tx.p.mc.mtx.Lock()
defer tx.p.mc.mtx.Unlock()
for _, id := range tx.ids {
delete(tx.p.mc.chunks, id)
}
return nil
}
func (tx *persistenceTx) rollback() error {
err0 := tx.chunks.Rollback()
err1 := tx.ix.Rollback()
if err0 != nil {
return err0
}
return err1
}

@ -0,0 +1,248 @@
package tsdb
import (
"encoding/binary"
"errors"
"fmt"
"io"
"github.com/fabxc/tsdb/chunks"
"github.com/fabxc/tsdb/index"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/metric"
)
// SeriesIterator provides iteration over a time series associated with a metric.
type SeriesIterator interface {
Metric() metric.Metric
Seek(model.Time) (model.SamplePair, bool)
Next() (model.SamplePair, bool)
Err() error
}
type chunkSeriesIterator struct {
m metric.Metric
chunks []chunks.Chunk
err error
cur chunks.Iterator
curPos int
}
func newChunkSeriesIterator(m metric.Metric, chunks []chunks.Chunk) *chunkSeriesIterator {
return &chunkSeriesIterator{
m: m,
chunks: chunks,
}
}
func (it *chunkSeriesIterator) Metric() metric.Metric {
return it.m
}
func (it *chunkSeriesIterator) Seek(ts model.Time) (model.SamplePair, bool) {
// Naively go through all chunk's first timestamps and pick the chunk
// containing the seeked timestamp.
// TODO(fabxc): this can be made smarter if it's a bottleneck.
for i, chk := range it.chunks {
cit := chk.Iterator()
first, ok := cit.First()
if !ok {
it.err = cit.Err()
return model.SamplePair{}, false
}
if first.Timestamp > ts {
break
}
it.cur = cit
it.curPos = i
}
return it.cur.Seek(ts)
}
func (it *chunkSeriesIterator) Next() (model.SamplePair, bool) {
sp, ok := it.cur.Next()
if ok {
return sp, true
}
if it.cur.Err() != io.EOF {
it.err = it.cur.Err()
return model.SamplePair{}, false
}
if len(it.chunks) == it.curPos+1 {
it.err = io.EOF
return model.SamplePair{}, false
}
it.curPos++
it.cur = it.chunks[it.curPos].Iterator()
// Return first sample of the new chunks.
return it.cur.Seek(0)
}
func (it *chunkSeriesIterator) Err() error {
return it.err
}
// Querier allows several queries over the storage with a consistent view if the data.
type Querier struct {
db *DB
iq *index.Querier
}
// Querier returns a new Querier on the index at the current point in time.
func (db *DB) Querier() (*Querier, error) {
iq, err := db.indexer.Querier()
if err != nil {
return nil, err
}
return &Querier{db: db, iq: iq}, nil
}
// Close the querier. This invalidates all previously retrieved iterators.
func (q *Querier) Close() error {
return q.iq.Close()
}
// Iterator returns an iterator over all chunks that match all given
// label matchers. The iterator is only valid until the Querier is closed.
func (q *Querier) Iterator(matchers ...*metric.LabelMatcher) (index.Iterator, error) {
var its []index.Iterator
for _, m := range matchers {
var matcher index.Matcher
switch m.Type {
case metric.Equal:
matcher = index.NewEqualMatcher(string(m.Value))
case metric.RegexMatch:
var err error
matcher, err = index.NewRegexpMatcher(string(m.Value))
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("matcher type %q not supported", m.Type)
}
it, err := q.iq.Search(string(m.Name), matcher)
if err != nil {
return nil, err
}
if it != nil {
its = append(its, it)
}
}
if len(its) == 0 {
return nil, errors.New("not found")
}
return index.Intersect(its...), nil
}
// RangeIterator returns an iterator over chunks that are present in the given time range.
// The returned iterator is only valid until the querier is closed.
func (q *Querier) RangeIterator(start, end model.Time) (index.Iterator, error) {
return nil, nil
}
// InstantIterator returns an iterator over chunks possibly containing values for
// the given timestamp. The returned iterator is only valid until the querier is closed.
func (q *Querier) InstantIterator(at model.Time) (index.Iterator, error) {
return nil, nil
}
// Series returns a list of series iterators over all chunks in the given iterator.
// The returned series iterators are only valid until the querier is closed.
func (q *Querier) Series(it index.Iterator) ([]SeriesIterator, error) {
mets := map[model.Fingerprint]metric.Metric{}
its := map[model.Fingerprint][]chunks.Chunk{}
id, err := it.Seek(0)
for ; err == nil; id, err = it.Next() {
terms, err := q.iq.Doc(id)
if err != nil {
return nil, err
}
met := make(model.Metric, len(terms))
for _, t := range terms {
met[model.LabelName(t.Field)] = model.LabelValue(t.Val)
}
fp := met.Fingerprint()
chunk, err := q.chunk(ChunkID(id))
if err != nil {
return nil, err
}
its[fp] = append(its[fp], chunk)
if _, ok := mets[fp]; ok {
continue
}
mets[fp] = metric.Metric{Metric: met, Copied: true}
}
if err != io.EOF {
return nil, err
}
res := make([]SeriesIterator, 0, len(its))
for fp, chks := range its {
res = append(res, newChunkSeriesIterator(mets[fp], chks))
}
return res, nil
}
func (q *Querier) chunk(id ChunkID) (chunks.Chunk, error) {
q.db.memChunks.mtx.RLock()
cd, ok := q.db.memChunks.chunks[id]
q.db.memChunks.mtx.RUnlock()
if ok {
return cd.chunk, nil
}
var chk chunks.Chunk
// TODO(fabxc): this starts a new read transaction for every
// chunk we have to load from persistence.
// Figure out what's best tradeoff between lock contention and
// data consistency: start transaction when instantiating the querier
// or lazily start transaction on first try. (Not all query operations
// need access to persisted chunks.)
err := q.db.persistence.view(func(tx *persistenceTx) error {
chks := tx.ix.Bucket(bktChunks)
ptr := chks.Get(id.bytes())
if ptr == nil {
return fmt.Errorf("chunk pointer for ID %d not found", id)
}
cdata, err := tx.chunks.Get(binary.BigEndian.Uint64(ptr))
if err != nil {
return fmt.Errorf("get chunk data for ID %d: %s", id, err)
}
chk, err = chunks.FromData(cdata)
return err
})
return chk, err
}
// Metrics returns the unique metrics found across all chunks in the provided iterator.
func (q *Querier) Metrics(it index.Iterator) ([]metric.Metric, error) {
m := []metric.Metric{}
fps := map[model.Fingerprint]struct{}{}
id, err := it.Seek(0)
for ; err == nil; id, err = it.Next() {
terms, err := q.iq.Doc(id)
if err != nil {
return nil, err
}
met := make(model.Metric, len(terms))
for _, t := range terms {
met[model.LabelName(t.Field)] = model.LabelValue(t.Val)
}
fp := met.Fingerprint()
if _, ok := fps[fp]; ok {
continue
}
fps[fp] = struct{}{}
m = append(m, metric.Metric{Metric: met, Copied: true})
}
if err != io.EOF {
return nil, err
}
return m, nil
}
Loading…
Cancel
Save