diff --git a/adapter.go b/adapter.go index 761dfa1e9..9774ab18a 100644 --- a/adapter.go +++ b/adapter.go @@ -5,7 +5,6 @@ package tsdb // "fmt" // "time" -// "github.com/fabxc/tsdb/index" // "github.com/prometheus/common/model" // "github.com/prometheus/prometheus/storage/local" // "github.com/prometheus/prometheus/storage/metric" @@ -13,33 +12,37 @@ package tsdb // ) // type DefaultSeriesIterator struct { +// s Series // it SeriesIterator // } // func (it *DefaultSeriesIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair { -// sp, ok := it.it.Seek(ts) +// ok := it.it.Seek(int64(ts)) // if !ok { // return model.SamplePair{Timestamp: model.Earliest} // } -// return sp +// t, v := it.it.Values() +// return model.SamplePair{Timestamp: model.Time(t), Value: model.SampleValue(v)} // } // func (it *DefaultSeriesIterator) Metric() metric.Metric { -// m := it.it.Metric() -// met := make(model.Metric, len(m)) -// for k, v := range m { -// met[model.LabelName(k)] = model.LabelValue(v) +// ls := it.s.Labels() +// met := make(model.Metric, len(ls)) +// for _, l := range ls { +// met[model.LabelName(l.Name)] = model.LabelValue(l.Value) // } // return metric.Metric{Metric: met, Copied: true} // } // func (it *DefaultSeriesIterator) RangeValues(interval metric.Interval) []model.SamplePair { // var res []model.SamplePair -// for sp, ok := it.it.Seek(interval.NewestInclusive); ok; sp, ok = it.it.Next() { -// if sp.Timestamp > interval.OldestInclusive { + +// for ok := it.it.Seek(int64(interval.NewestInclusive)); ok; ok = it.it.Next() { +// t, v := it.it.Values() +// if model.Time(t) > interval.OldestInclusive { // break // } -// res = append(res, sp) +// res = append(res, model.SamplePair{Timestamp: model.Time(t), Value: model.SampleValue(v)}) // } // return res // } @@ -80,12 +83,15 @@ package tsdb // // indexed. Indexing is needed for FingerprintsForLabelMatchers and // // LabelValuesForLabelName and may lag behind. // func (da *DefaultAdapter) WaitForIndexing() { -// da.db.indexer.wait() // } // func (da *DefaultAdapter) Append(s *model.Sample) error { +// labels := make([]Label, len(s.Metric)) +// for k, v := range s.Metric { +// labels = append(labels, Label{Name: string(k), Value: string(v)}) +// } // // Ignore the Scrape batching for now. -// return da.db.memChunks.append(s.Metric, s.Timestamp, s.Value) +// return da.db.appendSingle(labels, int64(s.Timestamp), float64(s.Value)) // } // func (da *DefaultAdapter) NeedsThrottling() bool { @@ -93,15 +99,16 @@ package tsdb // } // func (da *DefaultAdapter) Querier() (local.Querier, error) { -// q, err := da.db.Querier() -// if err != nil { -// return nil, err -// } +// // q, err := da.db.Querier() +// // if err != nil { +// // return nil, err +// // } + // return defaultQuerierAdapter{q: q}, nil // } // type defaultQuerierAdapter struct { -// q *Querier +// q Querier // } // func (da defaultQuerierAdapter) Close() error { diff --git a/block.go b/block.go index 3cbbcd41a..9043a7a23 100644 --- a/block.go +++ b/block.go @@ -48,11 +48,11 @@ const ( ) const ( - seriesMetaSize = int(unsafe.Sizeof(seriesMeta{})) + metaSize = int(unsafe.Sizeof(meta{})) seriesStatsSize = int(unsafe.Sizeof(blockStats{})) ) -type seriesMeta struct { +type meta struct { magic uint32 flag byte _ [7]byte // padding/reserved @@ -64,19 +64,19 @@ type blockStats struct { _ [4]byte // padding/reserved } -func (s *persistedSeries) meta() *seriesMeta { - return (*seriesMeta)(unsafe.Pointer(&s.data[0])) +func (s *persistedSeries) meta() *meta { + return (*meta)(unsafe.Pointer(&s.data[0])) } func (s *persistedSeries) stats() *blockStats { // The stats start right behind the block meta data. - return (*blockStats)(unsafe.Pointer(&s.data[seriesMetaSize])) + return (*blockStats)(unsafe.Pointer(&s.data[metaSize])) } // seriesAt returns the series stored at offset as a skiplist and the chunks // it points to as a byte slice. func (s *persistedSeries) seriesAt(offset int) (skiplist, []byte, error) { - offset += seriesMetaSize + offset += metaSize offset += seriesStatsSize switch b := s.data[offset]; b { @@ -157,8 +157,8 @@ func (bw *blockWriter) writeSeries(ow io.Writer) (n int64, err error) { // However, we'll have to pick correct endianness for the unsafe casts to work // when reading again. That and the added slowness due to reflection seem to make // it somewhat pointless. - meta := &seriesMeta{magic: magicSeries, flag: flagStd} - metab := ((*[seriesMetaSize]byte)(unsafe.Pointer(meta)))[:] + meta := &meta{magic: magicSeries, flag: flagStd} + metab := ((*[metaSize]byte)(unsafe.Pointer(meta)))[:] m, err := w.Write(metab) if err != nil { @@ -205,3 +205,22 @@ func (bw *blockWriter) writeSeries(ow io.Writer) (n int64, err error) { m, err = ow.Write(h.Sum(nil)) return n + int64(m), err } + +func (bw *blockWriter) writeIndex(ow io.Writer) (n int64, err error) { + // Duplicate all writes through a CRC64 hash writer. + h := crc64.New(crc64.MakeTable(crc64.ECMA)) + w := io.MultiWriter(h, ow) + + meta := &meta{magic: magicSeries, flag: flagStd} + metab := ((*[metaSize]byte)(unsafe.Pointer(meta)))[:] + + m, err := w.Write(metab) + if err != nil { + return n + int64(m), err + } + n += int64(m) + + // Write checksum to the original writer. + m, err = ow.Write(h.Sum(nil)) + return n + int64(m), err +} diff --git a/db.go b/db.go index 059764726..c13dfec2f 100644 --- a/db.go +++ b/db.go @@ -108,6 +108,17 @@ func (db *DB) AppendVector(ts int64, v *Vector) error { return nil } +func (db *DB) appendSingle(lset Labels, ts int64, v float64) error { + h := lset.Hash() + s := uint16(h >> (64 - seriesShardShift)) + + return db.shards[s].appendBatch(ts, Sample{ + Hash: h, + Labels: lset, + Value: v, + }) +} + // Matcher matches a string. type Matcher interface { // Match returns true if the matcher applies to the string value. @@ -142,8 +153,7 @@ type Querier interface { // Series represents a single time series. type Series interface { - // LabelsRef returns the label set reference - LabelRefs() LabelRefs + Labels() Labels // Iterator returns a new iterator of the data of the series. Iterator() SeriesIterator } @@ -192,24 +202,9 @@ func NewSeriesShard(path string, logger log.Logger) *SeriesShard { // Use actual time for now. s.head = NewHeadBlock(time.Now().UnixNano() / int64(time.Millisecond)) - go s.run() - return s } -func (s *SeriesShard) run() { - // for { - // select { - // case <-s.done: - // return - // case <-s.persistCh: - // if err := s.persist(); err != nil { - // s.logger.With("err", err).Error("persistence failed") - // } - // } - // } -} - // Close the series shard. func (s *SeriesShard) Close() error { close(s.done) @@ -241,9 +236,9 @@ func (s *SeriesShard) appendBatch(ts int64, samples []Sample) error { // TODO(fabxc): randomize over time if s.head.stats().samples/uint64(s.head.stats().chunks) > 400 { + s.persist() select { case s.persistCh <- struct{}{}: - s.logger.Debug("trigger persistence") go s.persist() default: }