diff --git a/index.go b/index.go index 0108358f4..7a4790af0 100644 --- a/index.go +++ b/index.go @@ -40,9 +40,10 @@ type IndexWriter interface { WriteLabelIndex(names []string, values []string) error // WritePostings writes a postings list for a single label pair. + // The Postings here contain refs to the series that were added. WritePostings(name, value string, it Postings) error - // Close writes any finalization and closes theresources associated with + // Close writes any finalization and closes the resources associated with // the underlying writer. Close() error } @@ -416,6 +417,7 @@ type IndexReader interface { LabelValues(names ...string) (StringTuples, error) // Postings returns the postings list iterator for the label pair. + // The Postings here contain the offsets to the series inside the index. Postings(name, value string) (Postings, error) // Series returns the series for the given reference. diff --git a/index_test.go b/index_test.go index 94e62ae55..d84e7ab4e 100644 --- a/index_test.go +++ b/index_test.go @@ -9,35 +9,135 @@ import ( "testing" "github.com/fabxc/tsdb/labels" + "github.com/pkg/errors" "github.com/stretchr/testify/require" ) -type mockIndexReader struct { - labelValues func(...string) (StringTuples, error) - postings func(string, string) (Postings, error) - series func(uint32) (labels.Labels, []ChunkMeta, error) - labelIndices func() ([][]string, error) - close func() error +type series struct { + l labels.Labels + chunks []*ChunkMeta } -func (ir *mockIndexReader) LabelValues(names ...string) (StringTuples, error) { - return ir.labelValues(names...) +type mockIndex struct { + series map[uint32]series + labelIndex map[string][]string + postings map[labels.Label]Postings } -func (ir *mockIndexReader) Postings(name, value string) (Postings, error) { - return ir.postings(name, value) +func newMockIndex() mockIndex { + return mockIndex{ + series: make(map[uint32]series), + labelIndex: make(map[string][]string), + postings: make(map[labels.Label]Postings), + } } -func (ir *mockIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { - return ir.series(ref) +func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error { + if _, ok := m.series[ref]; ok { + return errors.Errorf("series with reference %d already added", ref) + } + + m.series[ref] = series{ + l: l, + chunks: chunks, + } + + return nil } -func (ir *mockIndexReader) LabelIndices() ([][]string, error) { - return ir.labelIndices() +func (m mockIndex) WriteLabelIndex(names []string, values []string) error { + // TODO support composite indexes + if len(names) != 1 { + return errors.New("composite indexes not supported yet") + } + + sort.Strings(values) + m.labelIndex[names[0]] = values + return nil } -func (ir *mockIndexReader) Close() error { - return ir.close() +func (m mockIndex) WritePostings(name, value string, it Postings) error { + lbl := labels.Label{ + Name: name, + Value: value, + } + + type refdSeries struct { + ref uint32 + series series + } + + // Re-Order so that the list is ordered by labels of the series. + // Internally that is how the series are laid out. + refs := make([]refdSeries, 0) + for it.Next() { + s, ok := m.series[it.At()] + if !ok { + return errors.Errorf("series for reference %d not found", it.At()) + } + refs = append(refs, refdSeries{it.At(), s}) + } + if err := it.Err(); err != nil { + return err + } + + sort.Slice(refs, func(i, j int) bool { + return labels.Compare(refs[i].series.l, refs[j].series.l) < 0 + }) + + postings := make([]uint32, 0, len(refs)) + for _, r := range refs { + postings = append(postings, r.ref) + } + + m.postings[lbl] = newListPostings(postings) + return nil +} + +func (m mockIndex) Close() error { + return nil +} + +func (m mockIndex) LabelValues(names ...string) (StringTuples, error) { + // TODO support composite indexes + if len(names) != 1 { + return nil, errors.New("composite indexes not supported yet") + } + + return newStringTuples(m.labelIndex[names[0]], 1) +} + +func (m mockIndex) Postings(name, value string) (Postings, error) { + lbl := labels.Label{ + Name: name, + Value: value, + } + + p, ok := m.postings[lbl] + if !ok { + return nil, ErrNotFound + } + + return p, nil +} + +func (m mockIndex) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { + s, ok := m.series[ref] + if !ok { + return nil, nil, ErrNotFound + } + + return s.l, s.chunks, nil +} + +func (m mockIndex) LabelIndices() ([][]string, error) { + res := make([][]string, 0, len(m.labelIndex)) + + for k := range m.labelIndex { + res = append(res, []string{k}) + } + + return res, nil } func TestIndexRW_Create_Open(t *testing.T) { @@ -145,8 +245,12 @@ func TestPersistence_index_e2e(t *testing.T) { values = map[string]stringset{} ) + mi := newMockIndex() + for i, s := range input { - iw.AddSeries(uint32(i), s.labels, s.chunks...) + err = iw.AddSeries(uint32(i), s.labels, s.chunks...) + require.NoError(t, err) + mi.AddSeries(uint32(i), s.labels, s.chunks...) for _, l := range s.labels { valset, ok := values[l.Name] @@ -166,6 +270,13 @@ func TestPersistence_index_e2e(t *testing.T) { } err = iw.WritePostings("", "", newListPostings(all)) require.NoError(t, err) + mi.WritePostings("", "", newListPostings(all)) + + for tm := range postings.m { + err = iw.WritePostings(tm.name, tm.value, postings.get(tm)) + require.NoError(t, err) + mi.WritePostings(tm.name, tm.value, postings.get(tm)) + } err = iw.Close() require.NoError(t, err) @@ -173,37 +284,26 @@ func TestPersistence_index_e2e(t *testing.T) { ir, err := newIndexReader(dir) require.NoError(t, err) - allp, err := ir.Postings("", "") - require.NoError(t, err) - - var result indexWriterSeriesSlice - - for allp.Next() { - ref := allp.At() - - lset, chks, err := ir.Series(ref) + for p := range mi.postings { + gotp, err := ir.Postings(p.Name, p.Value) require.NoError(t, err) - result = append(result, &indexWriterSeries{ - offset: ref, - labels: lset, - chunks: chks, - }) - } - require.NoError(t, allp.Err()) + expp, err := mi.Postings(p.Name, p.Value) - // Persisted data must be sorted. - sort.IsSorted(result) + for gotp.Next() { + require.True(t, expp.Next()) - // Validate result contents. - sort.Sort(input) - require.Equal(t, len(input), len(result)) + ref := gotp.At() - for i, re := range result { - exp := input[i] + lset, chks, err := ir.Series(ref) + require.NoError(t, err) - require.Equal(t, exp.labels, re.labels) - require.Equal(t, exp.chunks, re.chunks) + explset, expchks, err := mi.Series(expp.At()) + require.Equal(t, explset, lset) + require.Equal(t, expchks, chks) + } + require.False(t, expp.Next()) + require.NoError(t, gotp.Err()) } require.NoError(t, ir.Close())