Merge pull request #15455 from bboreham/compact-cache-symbols

[PERF] TSDB: Cache all symbols for compaction
pull/15473/head
Ganesh Vernekar 2024-11-26 16:47:21 -05:00 committed by GitHub
commit 4dacd7572a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 21 additions and 39 deletions

View File

@ -110,12 +110,6 @@ func newCRC32() hash.Hash32 {
return crc32.New(castagnoliTable) return crc32.New(castagnoliTable)
} }
type symbolCacheEntry struct {
index uint32
lastValueIndex uint32
lastValue string
}
type PostingsEncoder func(*encoding.Encbuf, []uint32) error type PostingsEncoder func(*encoding.Encbuf, []uint32) error
type PostingsDecoder func(encoding.Decbuf) (int, Postings, error) type PostingsDecoder func(encoding.Decbuf) (int, Postings, error)
@ -146,7 +140,7 @@ type Writer struct {
symbols *Symbols symbols *Symbols
symbolFile *fileutil.MmapFile symbolFile *fileutil.MmapFile
lastSymbol string lastSymbol string
symbolCache map[string]symbolCacheEntry symbolCache map[string]uint32 // From symbol to index in table.
labelIndexes []labelIndexHashEntry // Label index offsets. labelIndexes []labelIndexHashEntry // Label index offsets.
labelNames map[string]uint64 // Label names, and their usage. labelNames map[string]uint64 // Label names, and their usage.
@ -246,7 +240,7 @@ func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncode
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
symbolCache: make(map[string]symbolCacheEntry, 1<<8), symbolCache: make(map[string]uint32, 1<<16),
labelNames: make(map[string]uint64, 1<<8), labelNames: make(map[string]uint64, 1<<8),
crc32: newCRC32(), crc32: newCRC32(),
postingsEncoder: encoder, postingsEncoder: encoder,
@ -478,29 +472,16 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ...
w.buf2.PutUvarint(lset.Len()) w.buf2.PutUvarint(lset.Len())
if err := lset.Validate(func(l labels.Label) error { if err := lset.Validate(func(l labels.Label) error {
var err error nameIndex, ok := w.symbolCache[l.Name]
cacheEntry, ok := w.symbolCache[l.Name]
nameIndex := cacheEntry.index
if !ok { if !ok {
nameIndex, err = w.symbols.ReverseLookup(l.Name) return fmt.Errorf("symbol entry for %q does not exist", l.Name)
if err != nil {
return fmt.Errorf("symbol entry for %q does not exist, %w", l.Name, err)
}
} }
w.labelNames[l.Name]++ w.labelNames[l.Name]++
w.buf2.PutUvarint32(nameIndex) w.buf2.PutUvarint32(nameIndex)
valueIndex := cacheEntry.lastValueIndex valueIndex, ok := w.symbolCache[l.Value]
if !ok || cacheEntry.lastValue != l.Value { if !ok {
valueIndex, err = w.symbols.ReverseLookup(l.Value) return fmt.Errorf("symbol entry for %q does not exist", l.Value)
if err != nil {
return fmt.Errorf("symbol entry for %q does not exist, %w", l.Value, err)
}
w.symbolCache[l.Name] = symbolCacheEntry{
index: nameIndex,
lastValueIndex: valueIndex,
lastValue: l.Value,
}
} }
w.buf2.PutUvarint32(valueIndex) w.buf2.PutUvarint32(valueIndex)
return nil return nil
@ -559,6 +540,7 @@ func (w *Writer) AddSymbol(sym string) error {
return fmt.Errorf("symbol %q out-of-order", sym) return fmt.Errorf("symbol %q out-of-order", sym)
} }
w.lastSymbol = sym w.lastSymbol = sym
w.symbolCache[sym] = uint32(w.numSymbols)
w.numSymbols++ w.numSymbols++
w.buf1.Reset() w.buf1.Reset()
w.buf1.PutUvarintStr(sym) w.buf1.PutUvarintStr(sym)
@ -630,7 +612,7 @@ func (w *Writer) writeLabelIndices() error {
cnt-- cnt--
d.Uvarint() // Keycount. d.Uvarint() // Keycount.
name := d.UvarintBytes() // Label name. name := d.UvarintBytes() // Label name.
value := yoloString(d.UvarintBytes()) // Label value. value := d.UvarintBytes() // Label value.
d.Uvarint64() // Offset. d.Uvarint64() // Offset.
if len(name) == 0 { if len(name) == 0 {
continue // All index is ignored. continue // All index is ignored.
@ -644,9 +626,9 @@ func (w *Writer) writeLabelIndices() error {
values = values[:0] values = values[:0]
} }
current = name current = name
sid, err := w.symbols.ReverseLookup(value) sid, ok := w.symbolCache[string(value)]
if err != nil { if !ok {
return err return fmt.Errorf("symbol entry for %q does not exist", string(value))
} }
values = append(values, sid) values = append(values, sid)
} }
@ -918,9 +900,9 @@ func (w *Writer) writePostingsToTmpFiles() error {
nameSymbols := map[uint32]string{} nameSymbols := map[uint32]string{}
for _, name := range batchNames { for _, name := range batchNames {
sid, err := w.symbols.ReverseLookup(name) sid, ok := w.symbolCache[name]
if err != nil { if !ok {
return err return fmt.Errorf("symbol entry for %q does not exist", name)
} }
nameSymbols[sid] = name nameSymbols[sid] = name
} }
@ -957,9 +939,9 @@ func (w *Writer) writePostingsToTmpFiles() error {
for _, name := range batchNames { for _, name := range batchNames {
// Write out postings for this label name. // Write out postings for this label name.
sid, err := w.symbols.ReverseLookup(name) sid, ok := w.symbolCache[name]
if err != nil { if !ok {
return err return fmt.Errorf("symbol entry for %q does not exist", name)
} }
values := make([]uint32, 0, len(postings[sid])) values := make([]uint32, 0, len(postings[sid]))
for v := range postings[sid] { for v := range postings[sid] {