diff --git a/db.go b/db.go new file mode 100644 index 000000000..065205760 --- /dev/null +++ b/db.go @@ -0,0 +1,471 @@ +// Package tsdb implements a time series storage for float64 sample data. +package tsdb + +import ( + "encoding/binary" + "path/filepath" + "sync" + "time" + + "github.com/fabxc/tsdb/chunks" + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" +) + +// DefaultOptions used for the DB. +var DefaultOptions = &Options{ + StalenessDelta: 5 * time.Minute, +} + +// Options of the DB storage. +type Options struct { + StalenessDelta time.Duration +} + +// DB is a time series storage. +type DB struct { + logger log.Logger + opts *Options + + memChunks *memChunks + persistence *persistence + indexer *indexer + stopc chan struct{} +} + +// Open or create a new DB. +func Open(path string, l log.Logger, opts *Options) (*DB, error) { + if opts == nil { + opts = DefaultOptions + } + + indexer, err := newMetricIndexer(filepath.Join(path, "index"), defaultIndexerQsize, defaultIndexerTimeout) + if err != nil { + return nil, err + } + persistence, err := newPersistence(filepath.Join(path, "chunks"), defaultIndexerQsize, defaultIndexerTimeout) + if err != nil { + return nil, err + } + + mchunks := newMemChunks(l, indexer, persistence, 10, opts.StalenessDelta) + indexer.mc = mchunks + persistence.mc = mchunks + + c := &DB{ + logger: l, + opts: opts, + memChunks: mchunks, + persistence: persistence, + indexer: indexer, + stopc: make(chan struct{}), + } + go c.memChunks.run(c.stopc) + + return c, nil +} + +// Close the storage and persist all writes. +func (c *DB) Close() error { + close(c.stopc) + // TODO(fabxc): blocking further writes here necessary? + c.indexer.wait() + c.persistence.wait() + + err0 := c.indexer.close() + err1 := c.persistence.close() + if err0 != nil { + return err0 + } + return err1 +} + +// Append ingestes the samples in the scrape into the storage. +func (c *DB) Append(scrape *Scrape) error { + // Sequentially add samples to in-memory chunks. + // TODO(fabxc): evaluate cost of making this atomic. + for _, s := range scrape.m { + if err := c.memChunks.append(s.met, scrape.ts, s.val); err != nil { + // TODO(fabxc): collect in multi error. + return err + } + // TODO(fabxc): increment ingested samples metric. + } + return nil +} + +// memChunks holds the chunks that are currently being appended to. +type memChunks struct { + logger log.Logger + stalenessDelta time.Duration + + mtx sync.RWMutex + // Chunks by their ID as accessed when retrieving a chunk ID from + // an index query. + chunks map[ChunkID]*chunkDesc + // The highest time slice chunks currently have. A new chunk can not + // be in a higher slice before all chunks with lower IDs have been + // added to the slice. + highTime model.Time + + // Power of 2 of chunk shards. + num uint8 + // Memory chunks sharded by leading bits of the chunk's metric's + // fingerprints. Used to quickly find chunks for new incoming samples + // where the metric is known but the chunk ID is not. + shards []*memChunksShard + + indexer *indexer + persistence *persistence +} + +// newMemChunks returns a new memChunks sharded by n locks. +func newMemChunks(l log.Logger, ix *indexer, p *persistence, n uint8, staleness time.Duration) *memChunks { + c := &memChunks{ + logger: l, + stalenessDelta: staleness, + num: n, + chunks: map[ChunkID]*chunkDesc{}, + persistence: p, + indexer: ix, + } + + if n > 63 { + panic("invalid shard power") + } + + // Initialize 2^n shards. + for i := 0; i < 1<>(64-mc.num)] + + cs.Lock() + defer cs.Unlock() + + chkd, created := cs.get(fp, m) + if created { + mc.indexer.enqueue(chkd) + } + if err := chkd.append(ts, v); err != chunks.ErrChunkFull { + return err + } + // Chunk was full, remove it so a new head chunk can be created. + // TODO(fabxc): should we just remove them during maintenance if we set a 'persisted' + // flag? + // If we shutdown we work down the persistence queue before exiting, so we should + // lose no data. If we crash, the last snapshot will still have the chunk. Theoretically, + // deleting it here should not be a problem. + cs.del(fp, chkd) + + mc.persistence.enqueue(chkd) + + // Create a new chunk lazily and continue. + chkd, created = cs.get(fp, m) + if !created { + // Bug if the chunk was not newly created. + panic("expected newly created chunk") + } + mc.indexer.enqueue(chkd) + + return chkd.append(ts, v) +} + +type memChunksShard struct { + sync.RWMutex + + // chunks holds chunk descriptors for one or more chunks + // with a given fingerprint. + descs map[model.Fingerprint][]*chunkDesc + csize int +} + +// get returns the chunk descriptor for the given fingerprint/metric combination. +// If none exists, a new chunk descriptor is created and true is returned. +func (cs *memChunksShard) get(fp model.Fingerprint, m model.Metric) (*chunkDesc, bool) { + chks := cs.descs[fp] + for _, cd := range chks { + if cd != nil && cd.met.Equal(m) { + return cd, false + } + } + // None of the given chunks was for the metric, create a new one. + cd := &chunkDesc{ + met: m, + chunk: chunks.NewPlainChunk(cs.csize), + } + // Try inserting chunk in existing whole before appending. + for i, c := range chks { + if c == nil { + chks[i] = cd + return cd, true + } + } + cs.descs[fp] = append(chks, cd) + return cd, true +} + +// del frees the field of the chunk descriptor for the fingerprint. +func (cs *memChunksShard) del(fp model.Fingerprint, chkd *chunkDesc) { + for i, d := range cs.descs[fp] { + if d == chkd { + cs.descs[fp][i] = nil + return + } + } +} + +// ChunkID is a unique identifier for a chunks. +type ChunkID uint64 + +func (id ChunkID) bytes() []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(id)) + return b +} + +// ChunkIDs is a sortable list of chunk IDs. +type ChunkIDs []ChunkID + +func (c ChunkIDs) Len() int { return len(c) } +func (c ChunkIDs) Swap(i, j int) { c[i], c[j] = c[j], c[i] } +func (c ChunkIDs) Less(i, j int) bool { return c[i] < c[j] } + +// chunkDesc wraps a plain data chunk and provides cached meta data about it. +type chunkDesc struct { + id ChunkID + met model.Metric + chunk chunks.Chunk + + // Caching fields. + firstTime model.Time + lastSample model.SamplePair + + app chunks.Appender // Current appender for the chunks. +} + +func (cd *chunkDesc) append(ts model.Time, v model.SampleValue) error { + if cd.app == nil { + cd.app = cd.chunk.Appender() + // TODO(fabxc): set correctly once loading from snapshot is added. + cd.firstTime = ts + } + cd.lastSample.Timestamp = ts + cd.lastSample.Value = v + + return cd.app.Append(ts, v) +} + +// Scrape gathers samples for a single timestamp. +type Scrape struct { + ts model.Time + m []sample +} + +type sample struct { + met model.Metric + val model.SampleValue +} + +// Reset resets the scrape data and initializes it for a new scrape at +// the given time. The underlying memory remains allocated for the next scrape. +func (s *Scrape) Reset(ts model.Time) { + s.ts = ts + s.m = s.m[:0] +} + +// Dump returns all samples that are part of the scrape. +func (s *Scrape) Dump() []*model.Sample { + d := make([]*model.Sample, 0, len(s.m)) + for _, sa := range s.m { + d = append(d, &model.Sample{ + Metric: sa.met, + Timestamp: s.ts, + Value: sa.val, + }) + } + return d +} + +// Add adds a sample value for the given metric to the scrape. +func (s *Scrape) Add(m model.Metric, v model.SampleValue) { + for ln, lv := range m { + if len(lv) == 0 { + delete(m, ln) + } + } + // TODO(fabxc): pre-sort added samples into the correct buckets + // of fingerprint shards so we only have to lock each memChunkShard once. + s.m = append(s.m, sample{met: m, val: v}) +} + +type chunkBatchProcessor struct { + processf func(...*chunkDesc) error + + mtx sync.RWMutex + logger log.Logger + q []*chunkDesc + + qcap int + timeout time.Duration + + timer *time.Timer + trigger chan struct{} + empty chan struct{} +} + +func newChunkBatchProcessor(l log.Logger, cap int, to time.Duration) *chunkBatchProcessor { + if l == nil { + l = log.NewNopLogger() + } + p := &chunkBatchProcessor{ + logger: l, + qcap: cap, + timeout: to, + timer: time.NewTimer(to), + trigger: make(chan struct{}, 1), + empty: make(chan struct{}), + } + // Start with closed channel so we don't block on wait if nothing + // has ever been indexed. + close(p.empty) + + go p.run() + return p +} + +func (p *chunkBatchProcessor) run() { + for { + // Process pending indexing batch if triggered + // or timeout since last indexing has passed. + select { + case <-p.trigger: + case <-p.timer.C: + } + + if err := p.process(); err != nil { + p.logger. + With("err", err).With("num", len(p.q)). + Error("batch failed, dropping chunks descs") + } + } +} + +func (p *chunkBatchProcessor) process() error { + // TODO(fabxc): locking the entire time will cause lock contention. + p.mtx.Lock() + defer p.mtx.Unlock() + + if len(p.q) == 0 { + return nil + } + // Leave chunk descs behind whether successful or not. + defer func() { + p.q = p.q[:0] + close(p.empty) + }() + + return p.processf(p.q...) +} + +func (p *chunkBatchProcessor) enqueue(cds ...*chunkDesc) { + p.mtx.Lock() + defer p.mtx.Unlock() + + if len(p.q) == 0 { + p.timer.Reset(p.timeout) + p.empty = make(chan struct{}) + } + + p.q = append(p.q, cds...) + if len(p.q) > p.qcap { + select { + case p.trigger <- struct{}{}: + default: + // If we cannot send a signal is already set. + } + } +} + +// wait blocks until the queue becomes empty. +func (p *chunkBatchProcessor) wait() { + p.mtx.RLock() + c := p.empty + p.mtx.RUnlock() + <-c +} diff --git a/db_test.go b/db_test.go new file mode 100644 index 000000000..0436a3033 --- /dev/null +++ b/db_test.go @@ -0,0 +1,145 @@ +package tsdb + +import ( + "fmt" + "io" + "io/ioutil" + "math/rand" + "os" + "testing" + "time" + + "github.com/fabxc/tindex" + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/cinamon/chunk" + "github.com/prometheus/prometheus/storage/metric" + "github.com/stretchr/testify/require" +) + +func TestE2E(t *testing.T) { + dir, err := ioutil.TempDir("", "cinamon_test") + require.NoError(t, err) + defer os.RemoveAll(dir) + + c, err := Open(dir, log.Base(), nil) + require.NoError(t, err) + + c.memChunks.indexer.timeout = 50 * time.Millisecond + + // Set indexer size to be triggered exactly when we hit the limit. + // c.memChunks.indexer.qmax = 10 + + mets := generateMetrics(100000) + // var wg sync.WaitGroup + // for k := 0; k < len(mets)/100+1; k++ { + // wg.Add(1) + // go func(mets []model.Metric) { + var s Scrape + for i := 0; i < 2*64; i++ { + s.Reset(model.Time(i) * 100000) + + for _, m := range mets { + s.Add(m, model.SampleValue(rand.Float64())) + } + require.NoError(t, c.Append(&s)) + } + // wg.Done() + // }(mets[k*100 : (k+1)*100]) + // } + // wg.Wait() + + start := time.Now() + c.memChunks.indexer.wait() + fmt.Println("index wait", time.Since(start)) + + start = time.Now() + q, err := c.Querier() + require.NoError(t, err) + defer q.Close() + + m1, err := metric.NewLabelMatcher(metric.Equal, "job", "somejob") + require.NoError(t, err) + m2, err := metric.NewLabelMatcher(metric.Equal, "label2", "value0") + require.NoError(t, err) + m3, err := metric.NewLabelMatcher(metric.Equal, "label4", "value0") + require.NoError(t, err) + + it, err := q.Iterator(m1, m2, m3) + require.NoError(t, err) + res, err := tindex.ExpandIterator(it) + require.NoError(t, err) + fmt.Println("result len", len(res)) + + fmt.Println("querying", time.Since(start)) +} + +func generateMetrics(n int) (res []model.Metric) { + for i := 0; i < n; i++ { + res = append(res, model.Metric{ + "job": "somejob", + "label5": model.LabelValue(fmt.Sprintf("value%d", i%10)), + "label4": model.LabelValue(fmt.Sprintf("value%d", i%5)), + "label3": model.LabelValue(fmt.Sprintf("value%d", i%3)), + "label2": model.LabelValue(fmt.Sprintf("value%d", i%2)), + "label1": model.LabelValue(fmt.Sprintf("value%d", i)), + }) + } + return res +} + +func TestMemChunksShardGet(t *testing.T) { + cs := &memChunksShard{ + descs: map[model.Fingerprint][]*chunkDesc{}, + csize: 100, + } + cdesc1, created1 := cs.get(123, model.Metric{"x": "1"}) + require.True(t, created1) + require.Equal(t, 1, len(cs.descs[123])) + require.Equal(t, &chunkDesc{ + met: model.Metric{"x": "1"}, + chunk: chunk.NewPlainChunk(100), + }, cdesc1) + + // Add colliding metric. + cdesc2, created2 := cs.get(123, model.Metric{"x": "2"}) + require.True(t, created2) + require.Equal(t, 2, len(cs.descs[123])) + require.Equal(t, &chunkDesc{ + met: model.Metric{"x": "2"}, + chunk: chunk.NewPlainChunk(100), + }, cdesc2) + // First chunk desc can still be retrieved correctly. + cdesc1, created1 = cs.get(123, model.Metric{"x": "1"}) + require.False(t, created1) + require.Equal(t, &chunkDesc{ + met: model.Metric{"x": "1"}, + chunk: chunk.NewPlainChunk(100), + }, cdesc1) +} + +func TestChunkSeriesIterator(t *testing.T) { + newChunk := func(s []model.SamplePair) chunk.Chunk { + c := chunk.NewPlainChunk(1000) + app := c.Appender() + for _, sp := range s { + if err := app.Append(sp.Timestamp, sp.Value); err != nil { + t.Fatal(err) + } + } + return c + } + it := newChunkSeriesIterator(metric.Metric{}, []chunk.Chunk{ + newChunk([]model.SamplePair{{1, 1}, {2, 2}, {3, 3}}), + newChunk([]model.SamplePair{{4, 4}, {5, 5}, {6, 6}}), + newChunk([]model.SamplePair{{7, 7}, {8, 8}, {9, 9}}), + }) + + var res []model.SamplePair + for sp, ok := it.Seek(0); ok; sp, ok = it.Next() { + fmt.Println(sp) + res = append(res, sp) + } + require.Equal(t, io.EOF, it.Err()) + require.Equal(t, []model.SamplePair{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}, {7, 7}, {8, 8}, {9, 9}}, res) +} diff --git a/index.go b/index.go new file mode 100644 index 000000000..83506da5c --- /dev/null +++ b/index.go @@ -0,0 +1,130 @@ +package tsdb + +import ( + "sort" + "strconv" + "sync/atomic" + "time" + + "github.com/fabxc/tsdb/index" + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" +) + +const ( + defaultIndexerTimeout = 1 * time.Second + defaultIndexerQsize = 500000 +) + +// indexer asynchronously indexes chunks in batches. It indexes all labels +// of a chunk with a forward mapping and additionally indexes the chunk for +// the time slice of its first sample. +type indexer struct { + *chunkBatchProcessor + + ix *index.Index + mc *memChunks +} + +// Create batch indexer that creates new index documents +// and indexes them by the metric fields. +// Its post-indexing hook populates the in-memory chunk forward index. +func newMetricIndexer(path string, qsz int, qto time.Duration) (*indexer, error) { + ix, err := index.Open(path, nil) + if err != nil { + return nil, err + } + + i := &indexer{ + ix: ix, + chunkBatchProcessor: newChunkBatchProcessor(log.Base(), qsz, qto), + } + i.chunkBatchProcessor.processf = i.index + + return i, nil +} + +func (ix *indexer) Querier() (*index.Querier, error) { + return ix.ix.Querier() +} + +const ( + timeSliceField = "__ts__" + timeSliceSize = 3 * time.Hour +) + +func timeSlice(t model.Time) model.Time { + return t - (t % model.Time(timeSliceSize/time.Millisecond)) +} + +func timeString(t model.Time) string { + return strconv.FormatInt(int64(t), 16) +} + +func (ix *indexer) close() error { + return ix.ix.Close() +} + +func (ix *indexer) index(cds ...*chunkDesc) error { + b, err := ix.ix.Batch() + if err != nil { + return err + } + + ids := make([]ChunkID, len(cds)) + for i, cd := range cds { + terms := make(index.Terms, 0, len(cd.met)) + for k, v := range cd.met { + t := index.Term{Field: string(k), Val: string(v)} + terms = append(terms, t) + } + id := b.Add(terms) + ts := timeSlice(cd.firstTime) + + // If the chunk has a higher time slice than the high one, + // don't index. It will be indexed when the next time slice + // is initiated over all memory chunks. + if ts <= ix.mc.highTime { + b.SecondaryIndex(id, index.Term{ + Field: timeSliceField, + Val: timeString(ts), + }) + } + + ids[i] = ChunkID(id) + } + + if err := b.Commit(); err != nil { + return err + } + + // We have to lock here already instead of post-commit as otherwise we might + // generate new chunk IDs, skip their indexing, and have a reindexTime being + // called with the chunk ID not being visible yet. + // TODO(fabxc): move back up + ix.mc.mtx.Lock() + defer ix.mc.mtx.Unlock() + + // Make in-memory chunks visible for read. + for i, cd := range cds { + atomic.StoreUint64((*uint64)(&cd.id), uint64(ids[i])) + ix.mc.chunks[cd.id] = cd + } + return nil +} + +// reindexTime creates an initial time slice index over all chunk IDs. +// Any future chunks indexed for the same time slice must have higher IDs. +func (ix *indexer) reindexTime(ids ChunkIDs, ts model.Time) error { + b, err := ix.ix.Batch() + if err != nil { + return err + } + sort.Sort(ids) + t := index.Term{Field: timeSliceField, Val: timeString(ts)} + + for _, id := range ids { + b.SecondaryIndex(index.DocID(id), t) + } + return b.Commit() +} diff --git a/persist.go b/persist.go new file mode 100644 index 000000000..56a47ed10 --- /dev/null +++ b/persist.go @@ -0,0 +1,163 @@ +package tsdb + +import ( + "encoding/binary" + "os" + "path/filepath" + "time" + + "github.com/boltdb/bolt" + "github.com/fabxc/pagebuf" + "github.com/prometheus/common/log" +) + +type persistence struct { + *chunkBatchProcessor + + mc *memChunks + chunks *pagebuf.DB + index *bolt.DB +} + +func newPersistence(path string, cap int, to time.Duration) (*persistence, error) { + if err := os.MkdirAll(path, 0777); err != nil { + return nil, err + } + ix, err := bolt.Open(filepath.Join(path, "ix"), 0666, nil) + if err != nil { + return nil, err + } + if err := ix.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(bktChunks) + return err + }); err != nil { + return nil, err + } + pb, err := pagebuf.Open(filepath.Join(path, "chunks"), 0666, nil) + if err != nil { + return nil, err + } + p := &persistence{ + chunks: pb, + index: ix, + chunkBatchProcessor: newChunkBatchProcessor(log.Base(), cap, to), + } + p.chunkBatchProcessor.processf = p.persist + + return p, nil +} + +var bktChunks = []byte("chunks") + +func (p *persistence) close() error { + // Index must be closed first, otherwise we might deadlock. + err0 := p.index.Close() + err1 := p.chunks.Close() + if err0 != nil { + return err0 + } + return err1 +} + +func (p *persistence) persist(cds ...*chunkDesc) error { + err := p.update(func(tx *persistenceTx) error { + bkt := tx.ix.Bucket(bktChunks) + for _, cd := range cds { + pos, err := tx.chunks.Add(cd.chunk.Data()) + if err != nil { + return err + } + var buf [16]byte + binary.BigEndian.PutUint64(buf[:8], uint64(cd.id)) + binary.BigEndian.PutUint64(buf[8:], pos) + if err := bkt.Put(buf[:8], buf[8:]); err != nil { + return err + } + + tx.ids = append(tx.ids, cd.id) + } + return nil + }) + return err +} + +func (p *persistence) update(f func(*persistenceTx) error) error { + tx, err := p.begin(true) + if err != nil { + return err + } + if err := f(tx); err != nil { + tx.rollback() + return err + } + return tx.commit() +} + +func (p *persistence) view(f func(*persistenceTx) error) error { + tx, err := p.begin(false) + if err != nil { + return err + } + if err := f(tx); err != nil { + tx.rollback() + return err + } + return tx.rollback() +} + +func (p *persistence) begin(writeable bool) (*persistenceTx, error) { + var err error + tx := &persistenceTx{p: p} + // Index transaction is the outer one so we might end up with orphaned + // chunks but never with dangling pointers in the index. + tx.ix, err = p.index.Begin(writeable) + if err != nil { + return nil, err + } + tx.chunks, err = p.chunks.Begin(writeable) + if err != nil { + tx.ix.Rollback() + return nil, err + } + + return tx, nil +} + +type persistenceTx struct { + p *persistence + ix *bolt.Tx + chunks *pagebuf.Tx + + ids []ChunkID +} + +func (tx *persistenceTx) commit() error { + if err := tx.chunks.Commit(); err != nil { + tx.ix.Rollback() + return err + } + if err := tx.ix.Commit(); err != nil { + // TODO(fabxc): log orphaned chunks. What about overwritten ones? + // Should we not allows delete and add in the same tx so this cannot happen? + return err + } + + // Successfully persisted chunks, clear them from the in-memory + // forward mapping. + tx.p.mc.mtx.Lock() + defer tx.p.mc.mtx.Unlock() + + for _, id := range tx.ids { + delete(tx.p.mc.chunks, id) + } + return nil +} + +func (tx *persistenceTx) rollback() error { + err0 := tx.chunks.Rollback() + err1 := tx.ix.Rollback() + if err0 != nil { + return err0 + } + return err1 +} diff --git a/querier.go b/querier.go new file mode 100644 index 000000000..3313fdbd0 --- /dev/null +++ b/querier.go @@ -0,0 +1,248 @@ +package tsdb + +import ( + "encoding/binary" + "errors" + "fmt" + "io" + + "github.com/fabxc/tsdb/chunks" + "github.com/fabxc/tsdb/index" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/metric" +) + +// SeriesIterator provides iteration over a time series associated with a metric. +type SeriesIterator interface { + Metric() metric.Metric + Seek(model.Time) (model.SamplePair, bool) + Next() (model.SamplePair, bool) + Err() error +} + +type chunkSeriesIterator struct { + m metric.Metric + chunks []chunks.Chunk + + err error + cur chunks.Iterator + curPos int +} + +func newChunkSeriesIterator(m metric.Metric, chunks []chunks.Chunk) *chunkSeriesIterator { + return &chunkSeriesIterator{ + m: m, + chunks: chunks, + } +} + +func (it *chunkSeriesIterator) Metric() metric.Metric { + return it.m +} + +func (it *chunkSeriesIterator) Seek(ts model.Time) (model.SamplePair, bool) { + // Naively go through all chunk's first timestamps and pick the chunk + // containing the seeked timestamp. + // TODO(fabxc): this can be made smarter if it's a bottleneck. + for i, chk := range it.chunks { + cit := chk.Iterator() + first, ok := cit.First() + if !ok { + it.err = cit.Err() + return model.SamplePair{}, false + } + if first.Timestamp > ts { + break + } + it.cur = cit + it.curPos = i + } + return it.cur.Seek(ts) +} + +func (it *chunkSeriesIterator) Next() (model.SamplePair, bool) { + sp, ok := it.cur.Next() + if ok { + return sp, true + } + if it.cur.Err() != io.EOF { + it.err = it.cur.Err() + return model.SamplePair{}, false + } + if len(it.chunks) == it.curPos+1 { + it.err = io.EOF + return model.SamplePair{}, false + } + it.curPos++ + it.cur = it.chunks[it.curPos].Iterator() + + // Return first sample of the new chunks. + return it.cur.Seek(0) +} + +func (it *chunkSeriesIterator) Err() error { + return it.err +} + +// Querier allows several queries over the storage with a consistent view if the data. +type Querier struct { + db *DB + iq *index.Querier +} + +// Querier returns a new Querier on the index at the current point in time. +func (db *DB) Querier() (*Querier, error) { + iq, err := db.indexer.Querier() + if err != nil { + return nil, err + } + return &Querier{db: db, iq: iq}, nil +} + +// Close the querier. This invalidates all previously retrieved iterators. +func (q *Querier) Close() error { + return q.iq.Close() +} + +// Iterator returns an iterator over all chunks that match all given +// label matchers. The iterator is only valid until the Querier is closed. +func (q *Querier) Iterator(matchers ...*metric.LabelMatcher) (index.Iterator, error) { + var its []index.Iterator + for _, m := range matchers { + var matcher index.Matcher + switch m.Type { + case metric.Equal: + matcher = index.NewEqualMatcher(string(m.Value)) + case metric.RegexMatch: + var err error + matcher, err = index.NewRegexpMatcher(string(m.Value)) + if err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("matcher type %q not supported", m.Type) + } + it, err := q.iq.Search(string(m.Name), matcher) + if err != nil { + return nil, err + } + if it != nil { + its = append(its, it) + } + } + if len(its) == 0 { + return nil, errors.New("not found") + } + return index.Intersect(its...), nil +} + +// RangeIterator returns an iterator over chunks that are present in the given time range. +// The returned iterator is only valid until the querier is closed. +func (q *Querier) RangeIterator(start, end model.Time) (index.Iterator, error) { + return nil, nil +} + +// InstantIterator returns an iterator over chunks possibly containing values for +// the given timestamp. The returned iterator is only valid until the querier is closed. +func (q *Querier) InstantIterator(at model.Time) (index.Iterator, error) { + return nil, nil +} + +// Series returns a list of series iterators over all chunks in the given iterator. +// The returned series iterators are only valid until the querier is closed. +func (q *Querier) Series(it index.Iterator) ([]SeriesIterator, error) { + mets := map[model.Fingerprint]metric.Metric{} + its := map[model.Fingerprint][]chunks.Chunk{} + + id, err := it.Seek(0) + for ; err == nil; id, err = it.Next() { + terms, err := q.iq.Doc(id) + if err != nil { + return nil, err + } + met := make(model.Metric, len(terms)) + for _, t := range terms { + met[model.LabelName(t.Field)] = model.LabelValue(t.Val) + } + fp := met.Fingerprint() + + chunk, err := q.chunk(ChunkID(id)) + if err != nil { + return nil, err + } + + its[fp] = append(its[fp], chunk) + if _, ok := mets[fp]; ok { + continue + } + mets[fp] = metric.Metric{Metric: met, Copied: true} + } + if err != io.EOF { + return nil, err + } + + res := make([]SeriesIterator, 0, len(its)) + for fp, chks := range its { + res = append(res, newChunkSeriesIterator(mets[fp], chks)) + } + return res, nil +} + +func (q *Querier) chunk(id ChunkID) (chunks.Chunk, error) { + q.db.memChunks.mtx.RLock() + cd, ok := q.db.memChunks.chunks[id] + q.db.memChunks.mtx.RUnlock() + if ok { + return cd.chunk, nil + } + + var chk chunks.Chunk + // TODO(fabxc): this starts a new read transaction for every + // chunk we have to load from persistence. + // Figure out what's best tradeoff between lock contention and + // data consistency: start transaction when instantiating the querier + // or lazily start transaction on first try. (Not all query operations + // need access to persisted chunks.) + err := q.db.persistence.view(func(tx *persistenceTx) error { + chks := tx.ix.Bucket(bktChunks) + ptr := chks.Get(id.bytes()) + if ptr == nil { + return fmt.Errorf("chunk pointer for ID %d not found", id) + } + cdata, err := tx.chunks.Get(binary.BigEndian.Uint64(ptr)) + if err != nil { + return fmt.Errorf("get chunk data for ID %d: %s", id, err) + } + chk, err = chunks.FromData(cdata) + return err + }) + return chk, err +} + +// Metrics returns the unique metrics found across all chunks in the provided iterator. +func (q *Querier) Metrics(it index.Iterator) ([]metric.Metric, error) { + m := []metric.Metric{} + fps := map[model.Fingerprint]struct{}{} + + id, err := it.Seek(0) + for ; err == nil; id, err = it.Next() { + terms, err := q.iq.Doc(id) + if err != nil { + return nil, err + } + met := make(model.Metric, len(terms)) + for _, t := range terms { + met[model.LabelName(t.Field)] = model.LabelValue(t.Val) + } + fp := met.Fingerprint() + if _, ok := fps[fp]; ok { + continue + } + fps[fp] = struct{}{} + m = append(m, metric.Metric{Metric: met, Copied: true}) + } + if err != io.EOF { + return nil, err + } + return m, nil +}