diff --git a/tsdb/block.go b/tsdb/block.go index e61d43ae4..ce75a56b2 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -69,14 +69,14 @@ type IndexReader interface { // and indices. Symbols() (map[string]struct{}, error) - // LabelValues returns the possible label values. + // LabelValues returns sorted possible label values. LabelValues(names ...string) (index.StringTuples, error) - // Postings returns the postings list iterator for the label pair. + // Postings returns the postings list iterator for the label pairs. // The Postings here contain the offsets to the series inside the index. - // Found IDs are not strictly required to point to a valid Series, e.g. during - // background garbage collections. - Postings(name, value string) (index.Postings, error) + // Found IDs are not strictly required to point to a valid Series, e.g. + // during background garbage collections. Input values must be sorted. + Postings(name string, values ...string) (index.Postings, error) // SortedPostings returns a postings list that is reordered to be sorted // by the label set of the underlying series. @@ -450,8 +450,8 @@ func (r blockIndexReader) LabelValues(names ...string) (index.StringTuples, erro return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) } -func (r blockIndexReader) Postings(name, value string) (index.Postings, error) { - p, err := r.ir.Postings(name, value) +func (r blockIndexReader) Postings(name string, values ...string) (index.Postings, error) { + p, err := r.ir.Postings(name, values...) if err != nil { return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) } diff --git a/tsdb/compact.go b/tsdb/compact.go index e446235fb..da304ed12 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -708,7 +708,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, allSymbols[s] = struct{}{} } - all, err := indexr.Postings(index.AllPostingsKey()) + k, v := index.AllPostingsKey() + all, err := indexr.Postings(k, v) if err != nil { return err } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 10455dbc6..1168460ec 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1421,7 +1421,8 @@ func TestChunkAtBlockBoundary(t *testing.T) { meta := block.Meta() - p, err := r.Postings(index.AllPostingsKey()) + k, v := index.AllPostingsKey() + p, err := r.Postings(k, v) testutil.Ok(t, err) var ( diff --git a/tsdb/docs/format/index.md b/tsdb/docs/format/index.md index 7d20b7d33..fd6bf043c 100644 --- a/tsdb/docs/format/index.md +++ b/tsdb/docs/format/index.md @@ -203,9 +203,9 @@ They are used to track label index sections. They are read into memory when an i ### Postings Offset Table -A postings offset table stores a sequence of postings offset entries. +A postings offset table stores a sequence of postings offset entries, sorted by label name and value. Every postings offset entry holds the label name/value pair and the offset to its series list in the postings section. -They are used to track postings sections. They are read into memory when an index file is loaded. +They are used to track postings sections. They are partially read into memory when an index file is loaded. ``` ┌─────────────────────┬──────────────────────┐ diff --git a/tsdb/encoding/encoding.go b/tsdb/encoding/encoding.go index 3e2e2804a..f2e103756 100644 --- a/tsdb/encoding/encoding.go +++ b/tsdb/encoding/encoding.go @@ -119,8 +119,11 @@ func NewDecbufAt(bs ByteSlice, off int, castagnoliTable *crc32.Table) Decbuf { b = bs.Range(off+4, off+4+l+4) dec := Decbuf{B: b[:len(b)-4]} - if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.Crc32(castagnoliTable) != exp { - return Decbuf{E: ErrInvalidChecksum} + if castagnoliTable != nil { + + if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.Crc32(castagnoliTable) != exp { + return Decbuf{E: ErrInvalidChecksum} + } } return dec } @@ -164,16 +167,30 @@ func (d *Decbuf) Crc32(castagnoliTable *crc32.Table) uint32 { return crc32.Checksum(d.B, castagnoliTable) } +func (d *Decbuf) Skip(l int) { + if len(d.B) < l { + d.E = ErrInvalidSize + return + } + d.B = d.B[l:] +} + func (d *Decbuf) UvarintStr() string { + return string(d.UvarintBytes()) +} + +// The return value becomes invalid if the byte slice goes away. +// Compared to UvarintStr, this avoid allocations. +func (d *Decbuf) UvarintBytes() []byte { l := d.Uvarint64() if d.E != nil { - return "" + return []byte{} } if len(d.B) < int(l) { d.E = ErrInvalidSize - return "" + return []byte{} } - s := string(d.B[:l]) + s := d.B[:l] d.B = d.B[l:] return s } diff --git a/tsdb/head.go b/tsdb/head.go index 34d40bb71..08379d614 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1383,9 +1383,13 @@ func (h *headIndexReader) LabelNames() ([]string, error) { return labelNames, nil } -// Postings returns the postings list iterator for the label pair. -func (h *headIndexReader) Postings(name, value string) (index.Postings, error) { - return h.head.postings.Get(name, value), nil +// Postings returns the postings list iterator for the label pairs. +func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) { + res := make([]index.Postings, 0, len(values)) + for _, value := range values { + res = append(res, h.head.postings.Get(name, value)) + } + return index.Merge(res...), nil } func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 592cb1f69..61b80427c 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -700,15 +700,17 @@ type StringTuples interface { } type Reader struct { - b ByteSlice + b ByteSlice + toc *TOC // Close that releases the underlying resources of the byte slice. c io.Closer // Cached hashmaps of section offsets. labels map[string]uint64 - // LabelName to LabelValue to offset map. - postings map[string]map[string]uint64 + // Map of LabelName to a list of some LabelValues's position in the offset table. + // The first and last values for each name are always present. + postings map[string][]postingOffset // Cache of read symbols. Strings that are returned when reading from the // block are always backed by true strings held in here rather than // strings that are backed by byte slices from the mmap'd index file. This @@ -724,6 +726,11 @@ type Reader struct { version int } +type postingOffset struct { + value string + off int +} + // ByteSlice abstracts a byte slice. type ByteSlice interface { Len() int @@ -772,7 +779,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { b: b, c: c, labels: map[string]uint64{}, - postings: map[string]map[string]uint64{}, + postings: map[string][]postingOffset{}, } // Verify header. @@ -788,12 +795,13 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { return nil, errors.Errorf("unknown index file version %d", r.version) } - toc, err := NewTOCFromByteSlice(b) + var err error + r.toc, err = NewTOCFromByteSlice(b) if err != nil { return nil, errors.Wrap(err, "read TOC") } - r.symbolsV2, r.symbolsV1, err = ReadSymbols(r.b, r.version, int(toc.Symbols)) + r.symbolsV2, r.symbolsV1, err = ReadSymbols(r.b, r.version, int(r.toc.Symbols)) if err != nil { return nil, errors.Wrap(err, "read symbols") } @@ -811,7 +819,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { allocatedSymbols[s] = s } - if err := ReadOffsetTable(r.b, toc.LabelIndicesTable, func(key []string, off uint64) error { + if err := ReadOffsetTable(r.b, r.toc.LabelIndicesTable, func(key []string, off uint64, _ int) error { if len(key) != 1 { return errors.Errorf("unexpected key length for label indices table %d", len(key)) } @@ -822,19 +830,46 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { return nil, errors.Wrap(err, "read label index table") } - r.postings[""] = map[string]uint64{} - if err := ReadOffsetTable(r.b, toc.PostingsTable, func(key []string, off uint64) error { + var lastKey []string + lastOff := 0 + valueCount := 0 + // For the postings offset table we keep every label name but only every nth + // label value (plus the first and last one), to save memory. + if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, _ uint64, off int) error { if len(key) != 2 { return errors.Errorf("unexpected key length for posting table %d", len(key)) } if _, ok := r.postings[key[0]]; !ok { - r.postings[allocatedSymbols[key[0]]] = map[string]uint64{} + // Next label name. + r.postings[allocatedSymbols[key[0]]] = []postingOffset{} + if lastKey != nil { + // Always include last value for each label name. + r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: allocatedSymbols[lastKey[1]], off: lastOff}) + } + lastKey = nil + valueCount = 0 } - r.postings[key[0]][allocatedSymbols[key[1]]] = off + if valueCount%32 == 0 { + r.postings[key[0]] = append(r.postings[key[0]], postingOffset{value: allocatedSymbols[key[1]], off: off}) + lastKey = nil + } else { + lastKey = key + lastOff = off + } + valueCount++ return nil }); err != nil { return nil, errors.Wrap(err, "read postings table") } + if lastKey != nil { + r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: allocatedSymbols[lastKey[1]], off: lastOff}) + } + // Trim any extra space in the slices. + for k, v := range r.postings { + l := make([]postingOffset, len(v)) + copy(l, v) + r.postings[k] = l + } r.dec = &Decoder{LookupSymbol: r.lookupSymbol} @@ -855,18 +890,21 @@ type Range struct { // for all postings lists. func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) { m := map[labels.Label]Range{} - - for k, e := range r.postings { - for v, start := range e { - d := encoding.NewDecbufAt(r.b, int(start), castagnoliTable) - if d.Err() != nil { - return nil, d.Err() - } - m[labels.Label{Name: k, Value: v}] = Range{ - Start: int64(start) + 4, - End: int64(start) + 4 + int64(d.Len()), - } + if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, off uint64, _ int) error { + if len(key) != 2 { + return errors.Errorf("unexpected key length for posting table %d", len(key)) } + d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable) + if d.Err() != nil { + return d.Err() + } + m[labels.Label{Name: key[0], Value: key[1]}] = Range{ + Start: int64(off) + 4, + End: int64(off) + 4 + int64(d.Len()), + } + return nil + }); err != nil { + return nil, errors.Wrap(err, "read postings table") } return m, nil } @@ -908,17 +946,18 @@ func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]strin // ReadOffsetTable reads an offset table and at the given position calls f for each // found entry. If f returns an error it stops decoding and returns the received error. -func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) error { +func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64, int) error) error { d := encoding.NewDecbufAt(bs, int(off), castagnoliTable) + startLen := d.Len() cnt := d.Be32() - // The Postings offset table takes only 2 keys per entry (name and value of label), - // and the LabelIndices offset table takes only 1 key per entry (a label name). - // Hence setting the size to max of both, i.e. 2. - keys := make([]string, 0, 2) for d.Err() == nil && d.Len() > 0 && cnt > 0 { + offsetPos := startLen - d.Len() keyCount := d.Uvarint() - keys = keys[:0] + // The Postings offset table takes only 2 keys per entry (name and value of label), + // and the LabelIndices offset table takes only 1 key per entry (a label name). + // Hence setting the size to max of both, i.e. 2. + keys := make([]string, 0, 2) for i := 0; i < keyCount; i++ { keys = append(keys, d.UvarintStr()) @@ -927,7 +966,7 @@ func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) e if d.Err() != nil { break } - if err := f(keys, o); err != nil { + if err := f(keys, o, offsetPos); err != nil { return err } cnt-- @@ -1027,25 +1066,82 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err return errors.Wrap(r.dec.Series(d.Get(), lbls, chks), "read series") } -// Postings returns a postings list for the given label pair. -func (r *Reader) Postings(name, value string) (Postings, error) { +func (r *Reader) Postings(name string, values ...string) (Postings, error) { e, ok := r.postings[name] if !ok { return EmptyPostings(), nil } - off, ok := e[value] - if !ok { + + if len(values) == 0 { return EmptyPostings(), nil } - d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable) - if d.Err() != nil { - return nil, errors.Wrap(d.Err(), "get postings entry") + + res := make([]Postings, 0, len(values)) + skip := 0 + valueIndex := 0 + for valueIndex < len(values) && values[valueIndex] < e[0].value { + // Discard values before the start. + valueIndex++ } - _, p, err := r.dec.Postings(d.Get()) - if err != nil { - return nil, errors.Wrap(err, "decode postings") + for valueIndex < len(values) { + value := values[valueIndex] + + i := sort.Search(len(e), func(i int) bool { return e[i].value >= value }) + if i == len(e) { + // We're past the end. + break + } + if i > 0 && e[i].value != value { + // Need to look from previous entry. + i-- + } + // Don't Crc32 the entire postings offset table, this is very slow + // so hope any issues were caught at startup. + d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) + d.Skip(e[i].off) + + // Iterate on the offset table. + var postingsOff uint64 // The offset into the postings table. + for d.Err() == nil { + if skip == 0 { + // These are always the same number of bytes, + // and it's faster to skip than parse. + skip = d.Len() + d.Uvarint() // Keycount. + d.UvarintBytes() // Label name. + skip -= d.Len() + } else { + d.Skip(skip) + } + v := d.UvarintBytes() // Label value. + postingsOff = d.Uvarint64() // Offset. + for string(v) >= value { + if string(v) == value { + // Read from the postings table. + d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) + _, p, err := r.dec.Postings(d2.Get()) + if err != nil { + return nil, errors.Wrap(err, "decode postings") + } + res = append(res, p) + } + valueIndex++ + if valueIndex == len(values) { + break + } + value = values[valueIndex] + } + if i+1 == len(e) || value >= e[i+1].value || valueIndex == len(values) { + // Need to go to a later postings offset entry, if there is one. + break + } + } + if d.Err() != nil { + return nil, errors.Wrap(d.Err(), "get postings offset entry") + } } - return p, nil + + return Merge(res...), nil } // SortedPostings returns the given postings list reordered so that the backing series diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 947541bac..95611d093 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -14,6 +14,7 @@ package index import ( + "fmt" "io/ioutil" "math/rand" "os" @@ -111,9 +112,13 @@ func (m mockIndex) LabelValues(names ...string) (StringTuples, error) { return NewStringTuples(m.labelIndex[names[0]], 1) } -func (m mockIndex) Postings(name, value string) (Postings, error) { - l := labels.Label{Name: name, Value: value} - return NewListPostings(m.postings[l]), nil +func (m mockIndex) Postings(name string, values ...string) (Postings, error) { + p := []Postings{} + for _, value := range values { + l := labels.Label{Name: name, Value: value} + p = append(p, NewListPostings(m.postings[l])) + } + return Merge(p...), nil } func (m mockIndex) SortedPostings(p Postings) Postings { @@ -238,6 +243,96 @@ func TestIndexRW_Postings(t *testing.T) { testutil.Ok(t, ir.Close()) } +func TestPostingsMany(t *testing.T) { + dir, err := ioutil.TempDir("", "test_postings_many") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + + fn := filepath.Join(dir, indexFilename) + + iw, err := NewWriter(fn) + testutil.Ok(t, err) + + // Create a label in the index which has 999 values. + symbols := map[string]struct{}{} + series := []labels.Labels{} + for i := 1; i < 1000; i++ { + v := fmt.Sprintf("%03d", i) + series = append(series, labels.FromStrings("i", v, "foo", "bar")) + symbols[v] = struct{}{} + } + symbols["i"] = struct{}{} + symbols["foo"] = struct{}{} + symbols["bar"] = struct{}{} + testutil.Ok(t, iw.AddSymbols(symbols)) + + for i, s := range series { + testutil.Ok(t, iw.AddSeries(uint64(i), s)) + } + for i, s := range series { + testutil.Ok(t, iw.WritePostings("i", s.Get("i"), newListPostings(uint64(i)))) + } + testutil.Ok(t, iw.Close()) + + ir, err := NewFileReader(fn) + testutil.Ok(t, err) + + cases := []struct { + in []string + }{ + // Simple cases, everything is present. + {in: []string{"002"}}, + {in: []string{"031", "032", "033"}}, + {in: []string{"032", "033"}}, + {in: []string{"127", "128"}}, + {in: []string{"127", "128", "129"}}, + {in: []string{"127", "129"}}, + {in: []string{"128", "129"}}, + {in: []string{"998", "999"}}, + {in: []string{"999"}}, + // Before actual values. + {in: []string{"000"}}, + {in: []string{"000", "001"}}, + {in: []string{"000", "002"}}, + // After actual values. + {in: []string{"999a"}}, + {in: []string{"999", "999a"}}, + {in: []string{"998", "999", "999a"}}, + // In the middle of actual values. + {in: []string{"126a", "127", "128"}}, + {in: []string{"127", "127a", "128"}}, + {in: []string{"127", "127a", "128", "128a", "129"}}, + {in: []string{"127", "128a", "129"}}, + {in: []string{"128", "128a", "129"}}, + {in: []string{"128", "129", "129a"}}, + {in: []string{"126a", "126b", "127", "127a", "127b", "128", "128a", "128b", "129", "129a", "129b"}}, + } + + for _, c := range cases { + it, err := ir.Postings("i", c.in...) + testutil.Ok(t, err) + + got := []string{} + var lbls labels.Labels + var metas []chunks.Meta + for it.Next() { + testutil.Ok(t, ir.Series(it.At(), &lbls, &metas)) + got = append(got, lbls.Get("i")) + } + testutil.Ok(t, it.Err()) + exp := []string{} + for _, e := range c.in { + if _, ok := symbols[e]; ok && e != "l" { + exp = append(exp, e) + } + } + testutil.Equals(t, exp, got, fmt.Sprintf("input: %v", c.in)) + } + +} + func TestPersistence_index_e2e(t *testing.T) { dir, err := ioutil.TempDir("", "test_persistence_e2e") testutil.Ok(t, err) @@ -327,12 +422,10 @@ func TestPersistence_index_e2e(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, mi.WritePostings("", "", newListPostings(all...))) - for n, e := range postings.m { - for v := range e { - err = iw.WritePostings(n, v, postings.Get(n, v)) - testutil.Ok(t, err) - mi.WritePostings(n, v, postings.Get(n, v)) - } + for _, l := range postings.SortedKeys() { + err := iw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)) + testutil.Ok(t, err) + mi.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)) } err = iw.Close() @@ -364,7 +457,7 @@ func TestPersistence_index_e2e(t *testing.T) { testutil.Equals(t, explset, lset) testutil.Equals(t, expchks, chks) } - testutil.Assert(t, expp.Next() == false, "") + testutil.Assert(t, expp.Next() == false, "Unexpected Next() for "+p.Name+" "+p.Value) testutil.Ok(t, gotp.Err()) } diff --git a/tsdb/index/postingsstats_test.go b/tsdb/index/postingsstats_test.go index d4ca758ff..33e4f80c2 100644 --- a/tsdb/index/postingsstats_test.go +++ b/tsdb/index/postingsstats_test.go @@ -13,7 +13,6 @@ package index import ( - "fmt" "testing" "github.com/prometheus/prometheus/util/testutil" @@ -36,7 +35,6 @@ func TestPostingsStats(t *testing.T) { data := stats.get() testutil.Equals(t, 10, len(data)) for i := 0; i < heapLength; i++ { - fmt.Printf("%d", data[i].Count) testutil.Equals(t, uint64(max-i), data[i].Count) } diff --git a/tsdb/querier.go b/tsdb/querier.go index 59ffdaa85..f1f2c5520 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -385,7 +385,8 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, // If there's nothing to subtract from, add in everything and remove the notIts later. if len(its) == 0 && len(notIts) != 0 { - allPostings, err := ix.Postings(index.AllPostingsKey()) + k, v := index.AllPostingsKey() + allPostings, err := ix.Postings(k, v) if err != nil { return nil, err } @@ -413,7 +414,8 @@ func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, erro if m.Type == labels.MatchRegexp { setMatches := findSetMatches(m.Value) if len(setMatches) > 0 { - return postingsForSetMatcher(ix, m.Name, setMatches) + sort.Strings(setMatches) + return ix.Postings(m.Name, setMatches...) } } @@ -437,17 +439,7 @@ func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, erro return index.EmptyPostings(), nil } - var rit []index.Postings - - for _, v := range res { - it, err := ix.Postings(m.Name, v) - if err != nil { - return nil, err - } - rit = append(rit, it) - } - - return index.Merge(rit...), nil + return ix.Postings(m.Name, res...) } // inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher. @@ -469,29 +461,7 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting } } - var rit []index.Postings - for _, v := range res { - it, err := ix.Postings(m.Name, v) - if err != nil { - return nil, err - } - - rit = append(rit, it) - } - - return index.Merge(rit...), nil -} - -func postingsForSetMatcher(ix IndexReader, name string, matches []string) (index.Postings, error) { - var its []index.Postings - for _, match := range matches { - if it, err := ix.Postings(name, match); err == nil { - its = append(its, it) - } else { - return nil, err - } - } - return index.Merge(its...), nil + return ix.Postings(m.Name, res...) } func mergeStrings(a, b []string) []string { diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 0029c60c0..03fd867f0 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1376,9 +1376,13 @@ func (m mockIndex) LabelValues(names ...string) (index.StringTuples, error) { return index.NewStringTuples(m.labelIndex[names[0]], 1) } -func (m mockIndex) Postings(name, value string) (index.Postings, error) { - l := labels.Label{Name: name, Value: value} - return index.NewListPostings(m.postings[l]), nil +func (m mockIndex) Postings(name string, values ...string) (index.Postings, error) { + res := make([]index.Postings, 0, len(values)) + for _, value := range values { + l := labels.Label{Name: name, Value: value} + res = append(res, index.NewListPostings(m.postings[l])) + } + return index.Merge(res...), nil } func (m mockIndex) SortedPostings(p index.Postings) index.Postings {