mirror of https://github.com/prometheus/prometheus
Write index with symbol table
parent
40a451694f
commit
1e0edf367b
23
db.go
23
db.go
|
@ -267,19 +267,32 @@ func (s *SeriesShard) persist() error {
|
|||
return err
|
||||
}
|
||||
|
||||
f, err := os.Create(filepath.Join(p, "series"))
|
||||
sf, err := os.Create(filepath.Join(p, "series"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
xf, err := os.Create(filepath.Join(p, "index"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w := newSeriesWriter(f, s.head.baseTimestamp)
|
||||
defer w.Close()
|
||||
iw := newIndexWriter(xf)
|
||||
sw := newSeriesWriter(sf, iw, s.head.baseTimestamp)
|
||||
|
||||
defer sw.Close()
|
||||
defer iw.Close()
|
||||
|
||||
for _, cd := range head.index.forward {
|
||||
w.WriteSeries(cd.lset, []*chunkDesc{cd})
|
||||
if err := sw.WriteSeries(cd.lset, []*chunkDesc{cd}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
sz := fmt.Sprintf("%fMiB", float64(w.Size())/1024/1024)
|
||||
if err := iw.WriteStats(nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sz := fmt.Sprintf("%fMiB", float64(sw.Size()+iw.Size())/1024/1024)
|
||||
|
||||
s.logger.With("size", sz).
|
||||
With("samples", head.samples).
|
||||
|
|
79
writer.go
79
writer.go
|
@ -1,9 +1,11 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"os"
|
||||
"sort"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
|
@ -37,15 +39,13 @@ type seriesWriter struct {
|
|||
|
||||
baseTimestamp int64
|
||||
index IndexWriter
|
||||
|
||||
chunkOffsets map[uint32][]uint32
|
||||
seriesOffsets map[uint32]uint32
|
||||
}
|
||||
|
||||
func newSeriesWriter(w io.Writer, base int64) *seriesWriter {
|
||||
func newSeriesWriter(w io.Writer, index IndexWriter, base int64) *seriesWriter {
|
||||
return &seriesWriter{
|
||||
w: w,
|
||||
n: 0,
|
||||
index: index,
|
||||
baseTimestamp: base,
|
||||
}
|
||||
}
|
||||
|
@ -148,9 +148,6 @@ type IndexWriter interface {
|
|||
// WriteStats writes final stats for the indexed block.
|
||||
WriteStats(*BlockStats) error
|
||||
|
||||
// WriteSymbols serializes all encountered string symbols.
|
||||
WriteSymbols([]string) error
|
||||
|
||||
// WriteLabelIndex serializes an index from label names to values.
|
||||
// The passed in values chained tuples of strings of the length of names.
|
||||
WriteLabelIndex(names []string, values []string) error
|
||||
|
@ -164,7 +161,7 @@ type IndexWriter interface {
|
|||
// Size returns the size of the data written so far.
|
||||
Size() int64
|
||||
|
||||
// Closes writes any finalization and closes theresources associated with
|
||||
// Close writes any finalization and closes theresources associated with
|
||||
// the underlying writer.
|
||||
Close() error
|
||||
}
|
||||
|
@ -177,19 +174,80 @@ type indexWriter struct {
|
|||
|
||||
series []Labels
|
||||
offsets [][]ChunkOffset
|
||||
|
||||
symbols map[string]uint32
|
||||
}
|
||||
|
||||
func newIndexWriter(w io.Writer) *indexWriter {
|
||||
return &indexWriter{
|
||||
w: w,
|
||||
n: 0,
|
||||
symbols: make(map[string]uint32),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *indexWriter) write(wr io.Writer, b []byte) error {
|
||||
n, err := wr.Write(b)
|
||||
w.n += int64(n)
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *indexWriter) writeMeta() error {
|
||||
meta := &meta{magic: MagicSeries, flag: flagStd}
|
||||
metab := ((*[metaSize]byte)(unsafe.Pointer(meta)))[:]
|
||||
|
||||
return w.write(w.w, metab)
|
||||
}
|
||||
|
||||
func (w *indexWriter) AddOffsets(lset Labels, offsets ...ChunkOffset) {
|
||||
w.series = append(w.series, lset)
|
||||
w.offsets = append(w.offsets, offsets)
|
||||
|
||||
// Populate the symbol table from all label sets we have to reference.
|
||||
for _, l := range lset {
|
||||
w.symbols[l.Name] = 0
|
||||
w.symbols[l.Value] = 0
|
||||
}
|
||||
}
|
||||
|
||||
func (w *indexWriter) WriteStats(*BlockStats) error {
|
||||
if w.n == 0 {
|
||||
if err := w.writeMeta(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.writeSymbols(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *indexWriter) WriteSymbols(symbols []string) error {
|
||||
return nil
|
||||
func (w *indexWriter) writeSymbols() error {
|
||||
// Generate sorted list of strings we will store as reference table.
|
||||
symbols := make([]string, 0, len(w.symbols))
|
||||
for s := range w.symbols {
|
||||
symbols = append(symbols, s)
|
||||
}
|
||||
sort.Strings(symbols)
|
||||
|
||||
h := crc32.NewIEEE()
|
||||
wr := io.MultiWriter(h, w.w)
|
||||
|
||||
buf := make([]byte, binary.MaxVarintLen32)
|
||||
|
||||
for _, s := range symbols {
|
||||
n := binary.PutUvarint(buf, uint64(len(s)))
|
||||
w.symbols[s] = uint32(w.n)
|
||||
|
||||
if err := w.write(wr, buf[:n]); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.write(wr, []byte(s)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return w.write(w.w, h.Sum(nil))
|
||||
}
|
||||
|
||||
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
||||
|
@ -207,6 +265,7 @@ func (w *indexWriter) WritePostings(name, value string, it Iterator) error {
|
|||
func (w *indexWriter) Size() int64 {
|
||||
return w.n
|
||||
}
|
||||
|
||||
func (w *indexWriter) Close() error {
|
||||
if f, ok := w.w.(*os.File); ok {
|
||||
if err := f.Sync(); err != nil {
|
||||
|
|
Loading…
Reference in New Issue