LabelNames() method to get all unique label names (#369)

* LabelNames() method to get all unique label names

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
pull/5805/head
Ganesh Vernekar 2018-11-07 21:22:41 +05:30 committed by Krasi Georgiev
parent a95323c021
commit 3a08a71d86
7 changed files with 207 additions and 15 deletions

View File

@ -83,8 +83,12 @@ type IndexReader interface {
Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error
// LabelIndices returns a list of string tuples for which a label value index exists.
// NOTE: This is deprecated. Use `LabelNames()` instead.
LabelIndices() ([][]string, error)
// LabelNames returns all the unique label names present in the index in sorted order.
LabelNames() ([]string, error)
// Close releases the underlying resources of the reader.
Close() error
}
@ -407,6 +411,10 @@ func (r blockIndexReader) LabelIndices() ([][]string, error) {
return ss, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
}
func (r blockIndexReader) LabelNames() ([]string, error) {
return r.b.LabelNames()
}
func (r blockIndexReader) Close() error {
r.b.pendingReaders.Done()
return nil
@ -564,6 +572,11 @@ func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool {
return pb.meta.MinTime <= maxt && mint < pb.meta.MaxTime
}
// LabelNames returns all the unique label names present in the Block in sorted order.
func (pb *Block) LabelNames() ([]string, error) {
return pb.indexr.LabelNames()
}
func clampInterval(a, b, mint, maxt int64) (int64, int64) {
if a < mint {
a = mint

43
db.go
View File

@ -878,6 +878,49 @@ func (db *DB) CleanTombstones() (err error) {
return errors.Wrap(db.reload(), "reload blocks")
}
// labelNames returns all the unique label names from the Block Readers.
func labelNames(brs ...BlockReader) (map[string]struct{}, error) {
labelNamesMap := make(map[string]struct{})
for _, br := range brs {
ir, err := br.Index()
if err != nil {
return nil, errors.Wrap(err, "get IndexReader")
}
names, err := ir.LabelNames()
if err != nil {
return nil, errors.Wrap(err, "LabelNames() from IndexReader")
}
for _, name := range names {
labelNamesMap[name] = struct{}{}
}
if err = ir.Close(); err != nil {
return nil, errors.Wrap(err, "close IndexReader")
}
}
return labelNamesMap, nil
}
// LabelNames returns all the unique label names present in the DB in sorted order.
func (db *DB) LabelNames() ([]string, error) {
brs := []BlockReader{db.head}
for _, b := range db.Blocks() {
brs = append(brs, b)
}
labelNamesMap, err := labelNames(brs...)
if err != nil {
return nil, err
}
labelNames := make([]string, 0, len(labelNamesMap))
for name := range labelNamesMap {
labelNames = append(labelNames, name)
}
sort.Strings(labelNames)
return labelNames, nil
}
func isBlockDir(fi os.FileInfo) bool {
if !fi.IsDir() {
return false

View File

@ -781,7 +781,7 @@ func TestTombstoneClean(t *testing.T) {
testutil.Equals(t, smplExp, smplRes)
}
for _, b := range db.blocks {
for _, b := range db.Blocks() {
testutil.Equals(t, NewMemTombstones(), b.tombstones)
}
}
@ -1134,7 +1134,7 @@ func TestChunkAtBlockBoundary(t *testing.T) {
err = db.compact()
testutil.Ok(t, err)
for _, block := range db.blocks {
for _, block := range db.Blocks() {
r, err := block.Index()
testutil.Ok(t, err)
defer r.Close()
@ -1303,6 +1303,101 @@ func TestInitializeHeadTimestamp(t *testing.T) {
})
}
func TestDB_LabelNames(t *testing.T) {
tests := []struct {
// Add 'sampleLabels1' -> Test Head -> Compact -> Test Disk ->
// -> Add 'sampleLabels2' -> Test Head+Disk
sampleLabels1 [][2]string // For checking head and disk separately.
// To test Head+Disk, sampleLabels2 should have
// at least 1 unique label name which is not in sampleLabels1.
sampleLabels2 [][2]string // // For checking head and disk together.
exp1 []string // after adding sampleLabels1.
exp2 []string // after adding sampleLabels1 and sampleLabels2.
}{
{
sampleLabels1: [][2]string{
[2]string{"name1", ""},
[2]string{"name3", ""},
[2]string{"name2", ""},
},
sampleLabels2: [][2]string{
[2]string{"name4", ""},
[2]string{"name1", ""},
},
exp1: []string{"name1", "name2", "name3"},
exp2: []string{"name1", "name2", "name3", "name4"},
},
{
sampleLabels1: [][2]string{
[2]string{"name2", ""},
[2]string{"name1", ""},
[2]string{"name2", ""},
},
sampleLabels2: [][2]string{
[2]string{"name6", ""},
[2]string{"name0", ""},
},
exp1: []string{"name1", "name2"},
exp2: []string{"name0", "name1", "name2", "name6"},
},
}
blockRange := DefaultOptions.BlockRanges[0]
// Appends samples into the database.
appendSamples := func(db *DB, mint, maxt int64, sampleLabels [][2]string) {
t.Helper()
app := db.Appender()
for i := mint; i <= maxt; i++ {
for _, tuple := range sampleLabels {
label := labels.FromStrings(tuple[0], tuple[1])
_, err := app.Add(label, i*blockRange, 0)
testutil.Ok(t, err)
}
}
err := app.Commit()
testutil.Ok(t, err)
}
for _, tst := range tests {
db, close := openTestDB(t, nil)
defer close()
defer db.Close()
appendSamples(db, 0, 4, tst.sampleLabels1)
// Testing head.
headIndexr, err := db.head.Index()
testutil.Ok(t, err)
labelNames, err := headIndexr.LabelNames()
testutil.Ok(t, err)
testutil.Equals(t, tst.exp1, labelNames)
testutil.Ok(t, headIndexr.Close())
// Testing disk.
err = db.compact()
testutil.Ok(t, err)
// All blocks have same label names, hence check them individually.
// No need to aggregrate and check.
for _, b := range db.Blocks() {
blockIndexr, err := b.Index()
testutil.Ok(t, err)
labelNames, err = blockIndexr.LabelNames()
testutil.Ok(t, err)
testutil.Equals(t, tst.exp1, labelNames)
testutil.Ok(t, blockIndexr.Close())
}
// Addings more samples to head with new label names
// so that we can test db.LabelNames() (the union).
appendSamples(db, 5, 9, tst.sampleLabels2)
// Testing DB (union).
labelNames, err = db.LabelNames()
testutil.Ok(t, err)
testutil.Equals(t, tst.exp2, labelNames)
}
}
func TestCorrectNumTombstones(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()

17
head.go
View File

@ -1026,6 +1026,21 @@ func (h *headIndexReader) LabelValues(names ...string) (index.StringTuples, erro
return index.NewStringTuples(sl, len(names))
}
// LabelNames returns all the unique label names present in the head.
func (h *headIndexReader) LabelNames() ([]string, error) {
h.head.symMtx.RLock()
defer h.head.symMtx.RUnlock()
labelNames := make([]string, 0, len(h.head.values))
for name := range h.head.values {
if name == "" {
continue
}
labelNames = append(labelNames, name)
}
sort.Strings(labelNames)
return labelNames, nil
}
// Postings returns the postings list iterator for the label pair.
func (h *headIndexReader) Postings(name, value string) (index.Postings, error) {
return h.head.postings.Get(name, value), nil
@ -1087,9 +1102,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks
func (h *headIndexReader) LabelIndices() ([][]string, error) {
h.head.symMtx.RLock()
defer h.head.symMtx.RUnlock()
res := [][]string{}
for s := range h.head.values {
res = append(res, []string{s})
}

View File

@ -38,6 +38,8 @@ const (
indexFormatV1 = 1
indexFormatV2 = 2
labelNameSeperator = "\xff"
)
type indexWriterSeries struct {
@ -850,9 +852,8 @@ func (r *Reader) SymbolTable() map[uint32]string {
// LabelValues returns value tuples that exist for the given label name tuples.
func (r *Reader) LabelValues(names ...string) (StringTuples, error) {
const sep = "\xff"
key := strings.Join(names, sep)
key := strings.Join(names, labelNameSeperator)
off, ok := r.labels[key]
if !ok {
// XXX(fabxc): hot fix. Should return a partial data error and handle cases
@ -882,14 +883,12 @@ type emptyStringTuples struct{}
func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil }
func (emptyStringTuples) Len() int { return 0 }
// LabelIndices returns a for which labels or label tuples value indices exist.
// LabelIndices returns a slice of label names for which labels or label tuples value indices exist.
// NOTE: This is deprecated. Use `LabelNames()` instead.
func (r *Reader) LabelIndices() ([][]string, error) {
const sep = "\xff"
res := [][]string{}
for s := range r.labels {
res = append(res, strings.Split(s, sep))
res = append(res, strings.Split(s, labelNameSeperator))
}
return res, nil
}
@ -935,6 +934,30 @@ func (r *Reader) SortedPostings(p Postings) Postings {
return p
}
// LabelNames returns all the unique label names present in the index.
func (r *Reader) LabelNames() ([]string, error) {
labelNamesMap := make(map[string]struct{}, len(r.labels))
for key := range r.labels {
// 'key' contains the label names concatenated with the
// delimiter 'labelNameSeperator'.
names := strings.Split(key, labelNameSeperator)
for _, name := range names {
if name == allPostingsKey.Name {
// This is not from any metric.
// It is basically an empty label name.
continue
}
labelNamesMap[name] = struct{}{}
}
}
labelNames := make([]string, 0, len(labelNamesMap))
for name := range labelNamesMap {
labelNames = append(labelNames, name)
}
sort.Strings(labelNames)
return labelNames, nil
}
type stringTuples struct {
length int // tuple length
entries []string // flattened tuple entries

View File

@ -140,11 +140,9 @@ func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta)
func (m mockIndex) LabelIndices() ([][]string, error) {
res := make([][]string, 0, len(m.labelIndex))
for k := range m.labelIndex {
res = append(res, []string{k})
}
return res, nil
}

View File

@ -1507,10 +1507,17 @@ func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta)
func (m mockIndex) LabelIndices() ([][]string, error) {
res := make([][]string, 0, len(m.labelIndex))
for k := range m.labelIndex {
res = append(res, []string{k})
}
return res, nil
}
func (m mockIndex) LabelNames() ([]string, error) {
labelNames := make([]string, 0, len(m.labelIndex))
for name := range m.labelIndex {
labelNames = append(labelNames, name)
}
sort.Strings(labelNames)
return labelNames, nil
}