mirror of https://github.com/prometheus/prometheus
Merge pull request #27 from Gouthamve/master
Add mockIndex And Refactor Tests To Use Thatpull/5805/head
commit
1afd8080f7
4
index.go
4
index.go
|
@ -40,9 +40,10 @@ type IndexWriter interface {
|
||||||
WriteLabelIndex(names []string, values []string) error
|
WriteLabelIndex(names []string, values []string) error
|
||||||
|
|
||||||
// WritePostings writes a postings list for a single label pair.
|
// WritePostings writes a postings list for a single label pair.
|
||||||
|
// The Postings here contain refs to the series that were added.
|
||||||
WritePostings(name, value string, it Postings) error
|
WritePostings(name, value string, it Postings) error
|
||||||
|
|
||||||
// Close writes any finalization and closes theresources associated with
|
// Close writes any finalization and closes the resources associated with
|
||||||
// the underlying writer.
|
// the underlying writer.
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
@ -416,6 +417,7 @@ type IndexReader interface {
|
||||||
LabelValues(names ...string) (StringTuples, error)
|
LabelValues(names ...string) (StringTuples, error)
|
||||||
|
|
||||||
// Postings returns the postings list iterator for the label pair.
|
// Postings returns the postings list iterator for the label pair.
|
||||||
|
// The Postings here contain the offsets to the series inside the index.
|
||||||
Postings(name, value string) (Postings, error)
|
Postings(name, value string) (Postings, error)
|
||||||
|
|
||||||
// Series returns the series for the given reference.
|
// Series returns the series for the given reference.
|
||||||
|
|
184
index_test.go
184
index_test.go
|
@ -9,35 +9,135 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/fabxc/tsdb/labels"
|
"github.com/fabxc/tsdb/labels"
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
type mockIndexReader struct {
|
type series struct {
|
||||||
labelValues func(...string) (StringTuples, error)
|
l labels.Labels
|
||||||
postings func(string, string) (Postings, error)
|
chunks []*ChunkMeta
|
||||||
series func(uint32) (labels.Labels, []ChunkMeta, error)
|
|
||||||
labelIndices func() ([][]string, error)
|
|
||||||
close func() error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ir *mockIndexReader) LabelValues(names ...string) (StringTuples, error) {
|
type mockIndex struct {
|
||||||
return ir.labelValues(names...)
|
series map[uint32]series
|
||||||
|
labelIndex map[string][]string
|
||||||
|
postings map[labels.Label]Postings
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ir *mockIndexReader) Postings(name, value string) (Postings, error) {
|
func newMockIndex() mockIndex {
|
||||||
return ir.postings(name, value)
|
return mockIndex{
|
||||||
|
series: make(map[uint32]series),
|
||||||
|
labelIndex: make(map[string][]string),
|
||||||
|
postings: make(map[labels.Label]Postings),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ir *mockIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) {
|
func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error {
|
||||||
return ir.series(ref)
|
if _, ok := m.series[ref]; ok {
|
||||||
|
return errors.Errorf("series with reference %d already added", ref)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.series[ref] = series{
|
||||||
|
l: l,
|
||||||
|
chunks: chunks,
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ir *mockIndexReader) LabelIndices() ([][]string, error) {
|
func (m mockIndex) WriteLabelIndex(names []string, values []string) error {
|
||||||
return ir.labelIndices()
|
// TODO support composite indexes
|
||||||
|
if len(names) != 1 {
|
||||||
|
return errors.New("composite indexes not supported yet")
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Strings(values)
|
||||||
|
m.labelIndex[names[0]] = values
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ir *mockIndexReader) Close() error {
|
func (m mockIndex) WritePostings(name, value string, it Postings) error {
|
||||||
return ir.close()
|
lbl := labels.Label{
|
||||||
|
Name: name,
|
||||||
|
Value: value,
|
||||||
|
}
|
||||||
|
|
||||||
|
type refdSeries struct {
|
||||||
|
ref uint32
|
||||||
|
series series
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-Order so that the list is ordered by labels of the series.
|
||||||
|
// Internally that is how the series are laid out.
|
||||||
|
refs := make([]refdSeries, 0)
|
||||||
|
for it.Next() {
|
||||||
|
s, ok := m.series[it.At()]
|
||||||
|
if !ok {
|
||||||
|
return errors.Errorf("series for reference %d not found", it.At())
|
||||||
|
}
|
||||||
|
refs = append(refs, refdSeries{it.At(), s})
|
||||||
|
}
|
||||||
|
if err := it.Err(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(refs, func(i, j int) bool {
|
||||||
|
return labels.Compare(refs[i].series.l, refs[j].series.l) < 0
|
||||||
|
})
|
||||||
|
|
||||||
|
postings := make([]uint32, 0, len(refs))
|
||||||
|
for _, r := range refs {
|
||||||
|
postings = append(postings, r.ref)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.postings[lbl] = newListPostings(postings)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockIndex) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockIndex) LabelValues(names ...string) (StringTuples, error) {
|
||||||
|
// TODO support composite indexes
|
||||||
|
if len(names) != 1 {
|
||||||
|
return nil, errors.New("composite indexes not supported yet")
|
||||||
|
}
|
||||||
|
|
||||||
|
return newStringTuples(m.labelIndex[names[0]], 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockIndex) Postings(name, value string) (Postings, error) {
|
||||||
|
lbl := labels.Label{
|
||||||
|
Name: name,
|
||||||
|
Value: value,
|
||||||
|
}
|
||||||
|
|
||||||
|
p, ok := m.postings[lbl]
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockIndex) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
|
||||||
|
s, ok := m.series[ref]
|
||||||
|
if !ok {
|
||||||
|
return nil, nil, ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.l, s.chunks, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m mockIndex) LabelIndices() ([][]string, error) {
|
||||||
|
res := make([][]string, 0, len(m.labelIndex))
|
||||||
|
|
||||||
|
for k := range m.labelIndex {
|
||||||
|
res = append(res, []string{k})
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIndexRW_Create_Open(t *testing.T) {
|
func TestIndexRW_Create_Open(t *testing.T) {
|
||||||
|
@ -145,8 +245,12 @@ func TestPersistence_index_e2e(t *testing.T) {
|
||||||
values = map[string]stringset{}
|
values = map[string]stringset{}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
mi := newMockIndex()
|
||||||
|
|
||||||
for i, s := range input {
|
for i, s := range input {
|
||||||
iw.AddSeries(uint32(i), s.labels, s.chunks...)
|
err = iw.AddSeries(uint32(i), s.labels, s.chunks...)
|
||||||
|
require.NoError(t, err)
|
||||||
|
mi.AddSeries(uint32(i), s.labels, s.chunks...)
|
||||||
|
|
||||||
for _, l := range s.labels {
|
for _, l := range s.labels {
|
||||||
valset, ok := values[l.Name]
|
valset, ok := values[l.Name]
|
||||||
|
@ -166,6 +270,13 @@ func TestPersistence_index_e2e(t *testing.T) {
|
||||||
}
|
}
|
||||||
err = iw.WritePostings("", "", newListPostings(all))
|
err = iw.WritePostings("", "", newListPostings(all))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
mi.WritePostings("", "", newListPostings(all))
|
||||||
|
|
||||||
|
for tm := range postings.m {
|
||||||
|
err = iw.WritePostings(tm.name, tm.value, postings.get(tm))
|
||||||
|
require.NoError(t, err)
|
||||||
|
mi.WritePostings(tm.name, tm.value, postings.get(tm))
|
||||||
|
}
|
||||||
|
|
||||||
err = iw.Close()
|
err = iw.Close()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -173,37 +284,26 @@ func TestPersistence_index_e2e(t *testing.T) {
|
||||||
ir, err := newIndexReader(dir)
|
ir, err := newIndexReader(dir)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
allp, err := ir.Postings("", "")
|
for p := range mi.postings {
|
||||||
require.NoError(t, err)
|
gotp, err := ir.Postings(p.Name, p.Value)
|
||||||
|
|
||||||
var result indexWriterSeriesSlice
|
|
||||||
|
|
||||||
for allp.Next() {
|
|
||||||
ref := allp.At()
|
|
||||||
|
|
||||||
lset, chks, err := ir.Series(ref)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
result = append(result, &indexWriterSeries{
|
expp, err := mi.Postings(p.Name, p.Value)
|
||||||
offset: ref,
|
|
||||||
labels: lset,
|
|
||||||
chunks: chks,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
require.NoError(t, allp.Err())
|
|
||||||
|
|
||||||
// Persisted data must be sorted.
|
for gotp.Next() {
|
||||||
sort.IsSorted(result)
|
require.True(t, expp.Next())
|
||||||
|
|
||||||
// Validate result contents.
|
ref := gotp.At()
|
||||||
sort.Sort(input)
|
|
||||||
require.Equal(t, len(input), len(result))
|
|
||||||
|
|
||||||
for i, re := range result {
|
lset, chks, err := ir.Series(ref)
|
||||||
exp := input[i]
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, exp.labels, re.labels)
|
explset, expchks, err := mi.Series(expp.At())
|
||||||
require.Equal(t, exp.chunks, re.chunks)
|
require.Equal(t, explset, lset)
|
||||||
|
require.Equal(t, expchks, chks)
|
||||||
|
}
|
||||||
|
require.False(t, expp.Next())
|
||||||
|
require.NoError(t, gotp.Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, ir.Close())
|
require.NoError(t, ir.Close())
|
||||||
|
|
Loading…
Reference in New Issue