diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 06c7e2563..700c5da7e 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -15,6 +15,7 @@ package index import ( "bufio" + "bytes" "context" "encoding/binary" "hash" @@ -116,8 +117,9 @@ type Writer struct { fPO *fileWriter cntPO uint64 - toc TOC - stage indexWriterStage + toc TOC + stage indexWriterStage + postingsStart uint64 // Due to padding, can differ from TOC entry. // Reusable memory. buf1 encoding.Encbuf @@ -128,9 +130,8 @@ type Writer struct { symbolFile *fileutil.MmapFile lastSymbol string - labelIndexes []labelIndexHashEntry // Label index offsets. - labelValues map[string]map[uint32]struct{} // Label names, and their values's symbol indexes. - labelNames map[string]uint64 // Label names, and their usage. + labelIndexes []labelIndexHashEntry // Label index offsets. + labelNames map[string]uint64 // Label names, and their usage. // Hold last series to validate that clients insert new series in order. lastSeries labels.Labels @@ -223,9 +224,8 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - labelNames: make(map[string]uint64, 1<<8), - labelValues: make(map[string]map[uint32]struct{}, 1<<8), - crc32: newCRC32(), + labelNames: make(map[string]uint64, 1<<8), + crc32: newCRC32(), } if err := iw.writeMeta(); err != nil { return nil, err @@ -277,7 +277,7 @@ func (fw *fileWriter) write(bufs ...[]byte) error { // Once we move to compressed/varint representations in those areas, this limitation // can be lifted. if fw.pos > 16*math.MaxUint32 { - return errors.Errorf("exceeding max size of 64GiB") + return errors.Errorf("%q exceeding max size of 64GiB", fw.name) } } return nil @@ -331,9 +331,11 @@ func (w *Writer) ensureStage(s indexWriterStage) error { if w.stage == s { return nil } - if w.stage+1 < s { + if w.stage < s-1 { // A stage has been skipped. - w.ensureStage(s - 1) + if err := w.ensureStage(s - 1); err != nil { + return err + } } if w.stage > s { return errors.Errorf("invalid stage %q, currently at %q", s, w.stage) @@ -420,7 +422,6 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta w.buf2.PutUvarint(len(lset)) for _, l := range lset { - // here we have an index for the symbol file if v2, otherwise it's an offset index, err := w.symbols.ReverseLookup(l.Name) if err != nil { return errors.Errorf("symbol entry for %q does not exist, %v", l.Name, err) @@ -433,11 +434,6 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta return errors.Errorf("symbol entry for %q does not exist, %v", l.Value, err) } w.buf2.PutUvarint32(index) - - if _, ok := w.labelValues[l.Name]; !ok { - w.labelValues[l.Name] = map[uint32]struct{}{} - } - w.labelValues[l.Name][index] = struct{}{} } w.buf2.PutUvarint(len(chunks)) @@ -536,19 +532,52 @@ func (w *Writer) finishSymbols() error { } func (w *Writer) writeLabelIndices() error { - names := make([]string, 0, len(w.labelValues)) - for n := range w.labelValues { - names = append(names, n) + if err := w.fPO.flush(); err != nil { + return err } - sort.Strings(names) - for _, n := range names { - values := make([]uint32, 0, len(w.labelValues[n])) - for v := range w.labelValues[n] { - values = append(values, v) + // Find all the label values in the tmp posting offset table. + f, err := fileutil.OpenMmapFile(w.fPO.name) + if err != nil { + return err + } + defer f.Close() + + d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos)) + cnt := w.cntPO + current := []byte{} + values := []uint32{} + for d.Err() == nil && cnt > 0 { + cnt-- + d.Uvarint() // Keycount. + name := d.UvarintBytes() // Label name. + value := yoloString(d.UvarintBytes()) // Label value. + d.Uvarint64() // Offset. + if len(name) == 0 { + continue // All index is ignored. + } + + if !bytes.Equal(name, current) && len(values) > 0 { + // We've reached a new label name. + if err := w.writeLabelIndex(string(current), values); err != nil { + return err + } + values = values[:0] } - sort.Sort(uint32slice(values)) - if err := w.writeLabelIndex(n, values); err != nil { + current = name + sid, err := w.symbols.ReverseLookup(value) + if err != nil { + return err + } + values = append(values, sid) + } + if d.Err() != nil { + return d.Err() + } + + // Handle the last label. + if len(values) > 0 { + if err := w.writeLabelIndex(string(current), values); err != nil { return err } } @@ -657,7 +686,7 @@ func (w *Writer) writePostingsOffsetTable() error { // Copy over the tmp posting offset table, however we need to // adjust the offsets. - adjustment := w.toc.Postings + adjustment := w.postingsStart w.buf1.Reset() w.crc32.Reset() @@ -888,10 +917,11 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { } func (w *Writer) writePostings() error { - // There's padding in the tmp filem make sure it actually works. + // There's padding in the tmp file, make sure it actually works. if err := w.f.addPadding(4); err != nil { return err } + w.postingsStart = w.f.pos // Copy temporary file into main index. if err := w.fP.flush(); err != nil { @@ -910,6 +940,9 @@ func (w *Writer) writePostings() error { } w.f.pos += uint64(n) + if err := w.fP.close(); err != nil { + return err + } if err := w.fP.remove(); err != nil { return err } @@ -928,11 +961,6 @@ type labelIndexHashEntry struct { offset uint64 } -type postingsHashEntry struct { - name, value string - offset uint64 -} - func (w *Writer) Close() error { if err := w.ensureStage(idxStageDone); err != nil { return err