/api/v1/labels endpoint for getting all label names (#4835)

* vendor: update tsdb

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* /api/v1/labels endpoint

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* regex matchers for API

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Add docs

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Matchers behaving as OR

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Removed the matchers

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* vendor: update tsdb using go mod

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* vendor update: tsdb

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Added LabelNames() to storage.Querier

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Test for api.labelNames

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Nits

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
pull/4884/head
Ganesh Vernekar 2018-11-19 15:51:14 +05:30 committed by Goutham Veeramachaneni
parent 11b336e3ca
commit ca93fd544b
33 changed files with 554 additions and 168 deletions

View File

@ -236,6 +236,49 @@ $ curl -g 'http://localhost:9090/api/v1/series?match[]=up&match[]=process_start_
}
```
### Getting label names
The following endpoint returns a list of label names:
```
GET /api/v1/labels
POST /api/v1/labels
```
The `data` section of the JSON response is a list of string label names.
Here is an example.
```json
$ curl 'localhost:9090/api/v1/labels'
{
"status": "success",
"data": [
"__name__",
"call",
"code",
"config",
"dialer_name",
"endpoint",
"event",
"goversion",
"handler",
"instance",
"interval",
"job",
"le",
"listener_name",
"name",
"quantile",
"reason",
"role",
"scrape_job",
"slice",
"version"
]
}
```
### Querying label values
The following endpoint returns a list of label values for a provided label name:
@ -244,7 +287,7 @@ The following endpoint returns a list of label values for a provided label name:
GET /api/v1/label/<label_name>/values
```
The `data` section of the JSON response is a list of string label names.
The `data` section of the JSON response is a list of string label values.
This example queries for all label values for the `job` label:

2
go.mod
View File

@ -105,7 +105,7 @@ require (
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1
github.com/prometheus/procfs v0.0.0-20160411190841-abf152e5f3e9 // indirect
github.com/prometheus/tsdb v0.0.0-20181003080831-0ce41118ed20
github.com/prometheus/tsdb v0.2.0
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect
github.com/rlmcpherson/s3gof3r v0.5.0 // indirect
github.com/rubyist/circuitbreaker v2.2.1+incompatible // indirect

4
go.sum
View File

@ -215,8 +215,8 @@ github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1 h1:osmNoEW2SCW3L
github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20160411190841-abf152e5f3e9 h1:IrO4Eb9oGw+GxzOhO4b2QC5EWO85Omh/4iTSPZktMm8=
github.com/prometheus/procfs v0.0.0-20160411190841-abf152e5f3e9/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/tsdb v0.0.0-20181003080831-0ce41118ed20 h1:Jh/eKJuru9z9u3rUGdQ8gYc3aZmCGkjXT3gmy0Ex8W8=
github.com/prometheus/tsdb v0.0.0-20181003080831-0ce41118ed20/go.mod h1:lFf/o1J2a31WmWQbxYXfY1azJK5Xp5D8hwKMnVMBTGU=
github.com/prometheus/tsdb v0.2.0 h1:27z98vFd/gPew17nmKEbLn37exGCwc2F5EyrgScg6bk=
github.com/prometheus/tsdb v0.2.0/go.mod h1:lFf/o1J2a31WmWQbxYXfY1azJK5Xp5D8hwKMnVMBTGU=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rlmcpherson/s3gof3r v0.5.0 h1:1izOJpTiohSibfOHuNyEA/yQnAirh05enzEdmhez43k=

View File

@ -173,6 +173,7 @@ func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.
return errSeriesSet{err: q.err}, q.err
}
func (*errQuerier) LabelValues(name string) ([]string, error) { return nil, nil }
func (*errQuerier) LabelNames() ([]string, error) { return nil, nil }
func (*errQuerier) Close() error { return nil }
// errSeriesSet implements storage.SeriesSet which always returns error.

View File

@ -16,10 +16,12 @@ package storage
import (
"container/heap"
"context"
"sort"
"strings"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
)
@ -280,6 +282,28 @@ func mergeTwoStringSlices(a, b []string) []string {
return result
}
// LabelNames returns all the unique label names present in the block in sorted order.
func (q *mergeQuerier) LabelNames() ([]string, error) {
labelNamesMap := make(map[string]struct{})
for _, b := range q.queriers {
names, err := b.LabelNames()
if err != nil {
return nil, errors.Wrap(err, "LabelNames() from Querier")
}
for _, name := range names {
labelNamesMap[name] = struct{}{}
}
}
labelNames := make([]string, 0, len(labelNamesMap))
for name := range labelNamesMap {
labelNames = append(labelNames, name)
}
sort.Strings(labelNames)
return labelNames, nil
}
// Close releases the resources of the Querier.
func (q *mergeQuerier) Close() error {
// TODO return multiple errors?

View File

@ -57,6 +57,9 @@ type Querier interface {
// LabelValues returns all potential values for a label name.
LabelValues(name string) ([]string, error)
// LabelNames returns all the unique label names present in the block in sorted order.
LabelNames() ([]string, error)
// Close releases the resources of the Querier.
Close() error
}

View File

@ -34,6 +34,10 @@ func (noopQuerier) LabelValues(name string) ([]string, error) {
return nil, nil
}
func (noopQuerier) LabelNames() ([]string, error) {
return nil, nil
}
func (noopQuerier) Close() error {
return nil
}

View File

@ -83,6 +83,12 @@ func (q *querier) LabelValues(name string) ([]string, error) {
return nil, nil
}
// LabelNames implements storage.Querier and is a noop.
func (q *querier) LabelNames() ([]string, error) {
// TODO implement?
return nil, nil
}
// Close implements storage.Querier and is a noop.
func (q *querier) Close() error {
return nil

View File

@ -202,6 +202,7 @@ func (q querier) Select(_ *storage.SelectParams, oms ...*labels.Matcher) (storag
}
func (q querier) LabelValues(name string) ([]string, error) { return q.q.LabelValues(name) }
func (q querier) LabelNames() ([]string, error) { return q.q.LabelNames() }
func (q querier) Close() error { return q.q.Close() }
type seriesSet struct {

View File

@ -13,6 +13,5 @@ install:
- go get -v -t ./...
script:
# `check_license` target is omitted due to some missing license headers
# `staticcheck` target is omitted due to linting errors
- make style unused test
- make check_license style unused test

4
vendor/github.com/prometheus/tsdb/MAINTAINERS.md generated vendored Normal file
View File

@ -0,0 +1,4 @@
Maintainers of this repository:
* Krasi Georgiev <kgeorgie@redhat.com> @krasi-georgiev
* Goutham Veeramachaneni <gouthamve@gmail.com> @gouthamve

View File

@ -7,6 +7,8 @@ This repository contains the Prometheus storage layer that is used in its 2.x re
A writeup of its design can be found [here](https://fabxc.org/blog/2017-04-10-writing-a-tsdb/).
Based on the Gorilla TSDB [white papers](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).
Video: [Storing 16 Bytes at Scale](https://youtu.be/b_pEevMAC3I) from [PromCon 2017](https://promcon.io/2017-munich/).
See also the [format documentation](docs/format/README.md).

View File

@ -83,8 +83,12 @@ type IndexReader interface {
Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error
// LabelIndices returns a list of string tuples for which a label value index exists.
// NOTE: This is deprecated. Use `LabelNames()` instead.
LabelIndices() ([][]string, error)
// LabelNames returns all the unique label names present in the index in sorted order.
LabelNames() ([]string, error)
// Close releases the underlying resources of the reader.
Close() error
}
@ -407,6 +411,10 @@ func (r blockIndexReader) LabelIndices() ([][]string, error) {
return ss, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
}
func (r blockIndexReader) LabelNames() ([]string, error) {
return r.b.LabelNames()
}
func (r blockIndexReader) Close() error {
r.b.pendingReaders.Done()
return nil
@ -449,7 +457,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
ir := pb.indexr
// Choose only valid postings which have chunks in the time-range.
stones := NewMemTombstones()
stones := newMemTombstones()
var lset labels.Labels
var chks []chunks.Meta
@ -557,13 +565,18 @@ func (pb *Block) Snapshot(dir string) error {
return nil
}
// Returns true if the block overlaps [mint, maxt].
// OverlapsClosedInterval returns true if the block overlaps [mint, maxt].
func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool {
// The block itself is a half-open interval
// [pb.meta.MinTime, pb.meta.MaxTime).
return pb.meta.MinTime <= maxt && mint < pb.meta.MaxTime
}
// LabelNames returns all the unique label names present in the Block in sorted order.
func (pb *Block) LabelNames() ([]string, error) {
return pb.indexr.LabelNames()
}
func clampInterval(a, b, mint, maxt int64) (int64, int64) {
if a < mint {
a = mint

View File

@ -38,7 +38,7 @@ type CheckpointStats struct {
TotalTombstones int // Processed tombstones including dropped ones.
}
// LastCheckpoint returns the directory name of the most recent checkpoint.
// LastCheckpoint returns the directory name and index of the most recent checkpoint.
// If dir does not contain any checkpoints, ErrNotFound is returned.
func LastCheckpoint(dir string) (string, int, error) {
files, err := ioutil.ReadDir(dir)
@ -55,18 +55,17 @@ func LastCheckpoint(dir string) (string, int, error) {
if !fi.IsDir() {
return "", 0, errors.Errorf("checkpoint %s is not a directory", fi.Name())
}
k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil {
continue
}
return fi.Name(), k, nil
return fi.Name(), idx, nil
}
return "", 0, ErrNotFound
}
// DeleteCheckpoints deletes all checkpoints in dir that have an index
// below n.
func DeleteCheckpoints(dir string, n int) error {
// DeleteCheckpoints deletes all checkpoints in a directory below a given index.
func DeleteCheckpoints(dir string, maxIndex int) error {
var errs MultiError
files, err := ioutil.ReadDir(dir)
@ -77,8 +76,8 @@ func DeleteCheckpoints(dir string, n int) error {
if !strings.HasPrefix(fi.Name(), checkpointPrefix) {
continue
}
k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil || k >= n {
index, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil || index >= maxIndex {
continue
}
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
@ -90,7 +89,7 @@ func DeleteCheckpoints(dir string, n int) error {
const checkpointPrefix = "checkpoint."
// Checkpoint creates a compacted checkpoint of segments in range [m, n] in the given WAL.
// Checkpoint creates a compacted checkpoint of segments in range [first, last] in the given WAL.
// It includes the most recent checkpoint if it exists.
// All series not satisfying keep and samples below mint are dropped.
//
@ -98,7 +97,7 @@ const checkpointPrefix = "checkpoint."
// segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate
// it with the original WAL.
func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
stats := &CheckpointStats{}
var sr io.Reader
@ -107,27 +106,28 @@ func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*C
// files if there is an error somewhere.
var closers []io.Closer
{
lastFn, k, err := LastCheckpoint(w.Dir())
dir, idx, err := LastCheckpoint(w.Dir())
if err != nil && err != ErrNotFound {
return nil, errors.Wrap(err, "find last checkpoint")
}
last := idx + 1
if err == nil {
if m > k+1 {
return nil, errors.New("unexpected gap to last checkpoint")
if from > last {
return nil, fmt.Errorf("unexpected gap to last checkpoint. expected:%v, requested:%v", last, from)
}
// Ignore WAL files below the checkpoint. They shouldn't exist to begin with.
m = k + 1
from = last
last, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), lastFn))
r, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), dir))
if err != nil {
return nil, errors.Wrap(err, "open last checkpoint")
}
defer last.Close()
closers = append(closers, last)
sr = last
defer r.Close()
closers = append(closers, r)
sr = r
}
segsr, err := wal.NewSegmentsRangeReader(w.Dir(), m, n)
segsr, err := wal.NewSegmentsRangeReader(w.Dir(), from, to)
if err != nil {
return nil, errors.Wrap(err, "create segment reader")
}
@ -141,7 +141,7 @@ func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*C
}
}
cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", n))
cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to))
cpdirtmp := cpdir + ".tmp"
if err := os.MkdirAll(cpdirtmp, 0777); err != nil {

View File

@ -59,7 +59,9 @@ type Compactor interface {
// Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan().
Compact(dest string, dirs ...string) (ulid.ULID, error)
// Can optionally pass a list of already open blocks,
// to avoid having to reopen them.
Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error)
}
// LeveledCompactor implements the Compactor interface.
@ -317,26 +319,41 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
// Compact creates a new block in the compactor's directory from the blocks in the
// provided directories.
func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID, err error) {
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {
var (
blocks []BlockReader
bs []*Block
metas []*BlockMeta
uids []string
)
start := time.Now()
for _, d := range dirs {
b, err := OpenBlock(d, c.chunkPool)
if err != nil {
return uid, err
}
defer b.Close()
meta, err := readMetaFile(d)
if err != nil {
return uid, err
}
var b *Block
// Use already open blocks if we can, to avoid
// having the index data in memory twice.
for _, o := range open {
if meta.ULID == o.Meta().ULID {
b = o
break
}
}
if b == nil {
var err error
b, err = OpenBlock(d, c.chunkPool)
if err != nil {
return uid, err
}
defer b.Close()
}
metas = append(metas, meta)
blocks = append(blocks, b)
bs = append(bs, b)
@ -356,6 +373,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
)
return uid, nil
}
@ -489,7 +507,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
}
// Create an empty tombstones file.
if err := writeTombstoneFile(tmp, NewMemTombstones()); err != nil {
if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file")
}
@ -524,6 +542,10 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
// populateBlock fills the index and chunk writers with new data gathered as the union
// of the provided blocks. It returns meta information for the new block.
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error {
if len(blocks) == 0 {
return errors.New("cannot populate block from no readers")
}
var (
set ChunkSeriesSet
allSymbols = make(map[string]struct{}, 1<<16)
@ -595,13 +617,17 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
continue
}
if len(dranges) > 0 {
// Re-encode the chunk to not have deleted values.
for i, chk := range chks {
for i, chk := range chks {
if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime {
return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d",
chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime)
}
if len(dranges) > 0 {
// Re-encode the chunk to not have deleted values.
if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) {
continue
}
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
@ -617,6 +643,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
chks[i].Chunk = newChunk
}
}
if err := chunkw.WriteChunks(chks...); err != nil {
return errors.Wrap(err, "write chunks")
}
@ -791,7 +818,6 @@ func (c *compactionMerger) Next() bool {
var chks []chunks.Meta
d := c.compare()
// Both sets contain the current series. Chain them into a single one.
if d > 0 {
lset, chks, c.intervals = c.b.At()
c.l = append(c.l[:0], lset...)
@ -805,8 +831,10 @@ func (c *compactionMerger) Next() bool {
c.aok = c.a.Next()
} else {
// Both sets contain the current series. Chain them into a single one.
l, ca, ra := c.a.At()
_, cb, rb := c.b.At()
for _, r := range rb {
ra = ra.add(r)
}

View File

@ -429,7 +429,7 @@ func (db *DB) compact() (err error) {
default:
}
if _, err := db.compactor.Compact(db.dir, plan...); err != nil {
if _, err := db.compactor.Compact(db.dir, plan, db.blocks); err != nil {
return errors.Wrapf(err, "compact %s", plan)
}
runtime.GC()
@ -793,7 +793,11 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
}
}
if maxt >= db.head.MinTime() {
blocks = append(blocks, db.head)
blocks = append(blocks, &rangeHead{
head: db.head,
mint: mint,
maxt: maxt,
})
}
sq := &querier{

View File

@ -1,3 +1,16 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (

View File

@ -1,3 +1,16 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package fileutil provides utility methods used when dealing with the filesystem in tsdb.
// It is largely copied from github.com/coreos/etcd/pkg/fileutil to avoid the
// dependency chain it brings with it.

View File

@ -1,3 +1,16 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileutil
import (

View File

@ -1,3 +1,16 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build windows
package fileutil

View File

@ -1,3 +1,16 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build windows
package fileutil

View File

@ -225,7 +225,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
values: map[string]stringset{},
symbols: map[string]struct{}{},
postings: index.NewUnorderedMemPostings(),
tombstones: NewMemTombstones(),
tombstones: newMemTombstones(),
}
h.metrics = newHeadMetrics(h, r)
@ -237,22 +237,28 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
// Samples before the mint timestamp are discarded.
func (h *Head) processWALSamples(
minValidTime int64,
partition, total uint64,
input <-chan []RefSample, output chan<- []RefSample,
) (unknownRefs uint64) {
defer close(output)
// Mitigate lock contention in getByID.
refSeries := map[uint64]*memSeries{}
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for samples := range input {
for _, s := range samples {
if s.T < minValidTime || s.Ref%total != partition {
if s.T < minValidTime {
continue
}
ms := h.series.getByID(s.Ref)
ms := refSeries[s.Ref]
if ms == nil {
unknownRefs++
continue
ms = h.series.getByID(s.Ref)
if ms == nil {
unknownRefs++
continue
}
refSeries[s.Ref] = ms
}
_, chunkCreated := ms.append(s.T, s.V)
if chunkCreated {
@ -310,25 +316,22 @@ func (h *Head) loadWAL(r *wal.Reader) error {
// They are connected through a ring of channels which ensures that all sample batches
// read from the WAL are processed in order.
var (
wg sync.WaitGroup
n = runtime.GOMAXPROCS(0)
firstInput = make(chan []RefSample, 300)
input = firstInput
wg sync.WaitGroup
n = runtime.GOMAXPROCS(0)
inputs = make([]chan []RefSample, n)
outputs = make([]chan []RefSample, n)
)
wg.Add(n)
for i := 0; i < n; i++ {
output := make(chan []RefSample, 300)
outputs[i] = make(chan []RefSample, 300)
inputs[i] = make(chan []RefSample, 300)
go func(i int, input <-chan []RefSample, output chan<- []RefSample) {
unknown := h.processWALSamples(minValidTime, uint64(i), uint64(n), input, output)
go func(input <-chan []RefSample, output chan<- []RefSample) {
unknown := h.processWALSamples(minValidTime, input, output)
atomic.AddUint64(&unknownRefs, unknown)
wg.Done()
}(i, input, output)
// The output feeds the next worker goroutine. For the last worker,
// it feeds the initial input again to reuse the RefSample slices.
input = output
}(inputs[i], outputs[i])
}
var (
@ -336,6 +339,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
series []RefSeries
samples []RefSample
tstones []Stone
err error
)
for r.Next() {
series, samples, tstones = series[:0], samples[:0], tstones[:0]
@ -343,7 +347,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
switch dec.Type(rec) {
case RecordSeries:
series, err := dec.Series(rec, series)
series, err = dec.Series(rec, series)
if err != nil {
return errors.Wrap(err, "decode series")
}
@ -355,7 +359,8 @@ func (h *Head) loadWAL(r *wal.Reader) error {
}
}
case RecordSamples:
samples, err := dec.Samples(rec, samples)
samples, err = dec.Samples(rec, samples)
s := samples
if err != nil {
return errors.Wrap(err, "decode samples")
}
@ -364,20 +369,31 @@ func (h *Head) loadWAL(r *wal.Reader) error {
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len(samples) > 0 {
n := 5000
if len(samples) < n {
n = len(samples)
m := 5000
if len(samples) < m {
m = len(samples)
}
var buf []RefSample
select {
case buf = <-input:
default:
shards := make([][]RefSample, n)
for i := 0; i < n; i++ {
var buf []RefSample
select {
case buf = <-outputs[i]:
default:
}
shards[i] = buf[:0]
}
firstInput <- append(buf[:0], samples[:n]...)
samples = samples[n:]
for _, sam := range samples[:m] {
mod := sam.Ref % uint64(n)
shards[mod] = append(shards[mod], sam)
}
for i := 0; i < n; i++ {
inputs[i] <- shards[i]
}
samples = samples[m:]
}
samples = s // Keep whole slice for reuse.
case RecordTombstones:
tstones, err := dec.Tombstones(rec, tstones)
tstones, err = dec.Tombstones(rec, tstones)
if err != nil {
return errors.Wrap(err, "decode tombstones")
}
@ -397,9 +413,11 @@ func (h *Head) loadWAL(r *wal.Reader) error {
return errors.Wrap(r.Err(), "read records")
}
// Signal termination to first worker and wait for last one to close its output channel.
close(firstInput)
for range input {
// Signal termination to each worker and wait for it to close its output channel.
for i := 0; i < n; i++ {
close(inputs[i])
for range outputs[i] {
}
}
wg.Wait()
@ -418,12 +436,12 @@ func (h *Head) Init() error {
}
// Backfill the checkpoint first if it exists.
cp, n, err := LastCheckpoint(h.wal.Dir())
dir, startFrom, err := LastCheckpoint(h.wal.Dir())
if err != nil && err != ErrNotFound {
return errors.Wrap(err, "find last checkpoint")
}
if err == nil {
sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), cp))
sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), dir))
if err != nil {
return errors.Wrap(err, "open checkpoint")
}
@ -434,11 +452,11 @@ func (h *Head) Init() error {
if err := h.loadWAL(wal.NewReader(sr)); err != nil {
return errors.Wrap(err, "backfill checkpoint")
}
n++
startFrom++
}
// Backfill segments from the last checkpoint onwards
sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), n, -1)
sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), startFrom, -1)
if err != nil {
return errors.Wrap(err, "open WAL segments")
}
@ -493,18 +511,18 @@ func (h *Head) Truncate(mint int64) (err error) {
}
start = time.Now()
m, n, err := h.wal.Segments()
first, last, err := h.wal.Segments()
if err != nil {
return errors.Wrap(err, "get segment range")
}
n-- // Never consider last segment for checkpoint.
if n < 0 {
last-- // Never consider last segment for checkpoint.
if last < 0 {
return nil // no segments yet.
}
// The lower third of segments should contain mostly obsolete samples.
// If we have less than three segments, it's not worth checkpointing yet.
n = m + (n-m)/3
if n <= m {
last = first + (last-first)/3
if last <= first {
return nil
}
@ -512,18 +530,18 @@ func (h *Head) Truncate(mint int64) (err error) {
return h.series.getByID(id) != nil
}
h.metrics.checkpointCreationTotal.Inc()
if _, err = Checkpoint(h.wal, m, n, keep, mint); err != nil {
if _, err = Checkpoint(h.wal, first, last, keep, mint); err != nil {
h.metrics.checkpointCreationFail.Inc()
return errors.Wrap(err, "create checkpoint")
}
if err := h.wal.Truncate(n + 1); err != nil {
if err := h.wal.Truncate(last + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them.
level.Error(h.logger).Log("msg", "truncating segments failed", "err", err)
}
h.metrics.checkpointDeleteTotal.Inc()
if err := DeleteCheckpoints(h.wal.Dir(), n); err != nil {
if err := DeleteCheckpoints(h.wal.Dir(), last); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher checkpoint exists.
@ -533,7 +551,7 @@ func (h *Head) Truncate(mint int64) (err error) {
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
level.Info(h.logger).Log("msg", "WAL checkpoint complete",
"low", m, "high", n, "duration", time.Since(start))
"first", first, "last", last, "duration", time.Since(start))
return nil
}
@ -1014,19 +1032,33 @@ func (h *headIndexReader) LabelValues(names ...string) (index.StringTuples, erro
if len(names) != 1 {
return nil, errInvalidSize
}
var sl []string
h.head.symMtx.RLock()
defer h.head.symMtx.RUnlock()
sl := make([]string, 0, len(h.head.values[names[0]]))
for s := range h.head.values[names[0]] {
sl = append(sl, s)
}
h.head.symMtx.RUnlock()
sort.Strings(sl)
return index.NewStringTuples(sl, len(names))
}
// LabelNames returns all the unique label names present in the head.
func (h *headIndexReader) LabelNames() ([]string, error) {
h.head.symMtx.RLock()
defer h.head.symMtx.RUnlock()
labelNames := make([]string, 0, len(h.head.values))
for name := range h.head.values {
if name == "" {
continue
}
labelNames = append(labelNames, name)
}
sort.Strings(labelNames)
return labelNames, nil
}
// Postings returns the postings list iterator for the label pair.
func (h *headIndexReader) Postings(name, value string) (index.Postings, error) {
return h.head.postings.Get(name, value), nil
@ -1088,9 +1120,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks
func (h *headIndexReader) LabelIndices() ([][]string, error) {
h.head.symMtx.RLock()
defer h.head.symMtx.RUnlock()
res := [][]string{}
for s := range h.head.values {
res = append(res, []string{s})
}
@ -1313,6 +1343,14 @@ type sample struct {
v float64
}
func (s sample) T() int64 {
return s.t
}
func (s sample) V() float64 {
return s.v
}
// memSeries is the in-memory representation of a series. None of its methods
// are goroutine safe and it is the caller's responsibility to lock it.
type memSeries struct {
@ -1321,11 +1359,11 @@ type memSeries struct {
ref uint64
lset labels.Labels
chunks []*memChunk
headChunk *memChunk
chunkRange int64
firstChunkID int
nextAt int64 // Timestamp at which to cut the next chunk.
lastValue float64
sampleBuf [4]sample
pendingCommit bool // Whether there are samples waiting to be committed to this series.
@ -1354,6 +1392,7 @@ func (s *memSeries) cut(mint int64) *memChunk {
maxTime: math.MinInt64,
}
s.chunks = append(s.chunks, c)
s.headChunk = c
// Set upper bound on when the next chunk must be started. An earlier timestamp
// may be chosen dynamically at a later point.
@ -1392,7 +1431,7 @@ func (s *memSeries) appendable(t int64, v float64) error {
}
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
if math.Float64bits(s.lastValue) != math.Float64bits(v) {
if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) {
return ErrAmendSample
}
return nil
@ -1422,12 +1461,20 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
}
s.chunks = append(s.chunks[:0], s.chunks[k:]...)
s.firstChunkID += k
if len(s.chunks) == 0 {
s.headChunk = nil
} else {
s.headChunk = s.chunks[len(s.chunks)-1]
}
return k
}
// append adds the sample (t, v) to the series.
func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
// Based on Gorilla white papers this offers near-optimal compression ratio
// so anything bigger that this has diminishing returns and increases
// the time range within which we have to decompress all samples.
const samplesPerChunk = 120
c := s.head()
@ -1456,8 +1503,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
c.maxTime = t
s.lastValue = v
s.sampleBuf[0] = s.sampleBuf[1]
s.sampleBuf[1] = s.sampleBuf[2]
s.sampleBuf[2] = s.sampleBuf[3]
@ -1501,10 +1546,7 @@ func (s *memSeries) iterator(id int) chunkenc.Iterator {
}
func (s *memSeries) head() *memChunk {
if len(s.chunks) == 0 {
return nil
}
return s.chunks[len(s.chunks)-1]
return s.headChunk
}
type memChunk struct {

View File

@ -1,3 +1,16 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package index
import (

View File

@ -38,6 +38,8 @@ const (
indexFormatV1 = 1
indexFormatV2 = 2
labelNameSeperator = "\xff"
)
type indexWriterSeries struct {
@ -850,9 +852,8 @@ func (r *Reader) SymbolTable() map[uint32]string {
// LabelValues returns value tuples that exist for the given label name tuples.
func (r *Reader) LabelValues(names ...string) (StringTuples, error) {
const sep = "\xff"
key := strings.Join(names, sep)
key := strings.Join(names, labelNameSeperator)
off, ok := r.labels[key]
if !ok {
// XXX(fabxc): hot fix. Should return a partial data error and handle cases
@ -882,14 +883,12 @@ type emptyStringTuples struct{}
func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil }
func (emptyStringTuples) Len() int { return 0 }
// LabelIndices returns a for which labels or label tuples value indices exist.
// LabelIndices returns a slice of label names for which labels or label tuples value indices exist.
// NOTE: This is deprecated. Use `LabelNames()` instead.
func (r *Reader) LabelIndices() ([][]string, error) {
const sep = "\xff"
res := [][]string{}
for s := range r.labels {
res = append(res, strings.Split(s, sep))
res = append(res, strings.Split(s, labelNameSeperator))
}
return res, nil
}
@ -935,6 +934,30 @@ func (r *Reader) SortedPostings(p Postings) Postings {
return p
}
// LabelNames returns all the unique label names present in the index.
func (r *Reader) LabelNames() ([]string, error) {
labelNamesMap := make(map[string]struct{}, len(r.labels))
for key := range r.labels {
// 'key' contains the label names concatenated with the
// delimiter 'labelNameSeperator'.
names := strings.Split(key, labelNameSeperator)
for _, name := range names {
if name == allPostingsKey.Name {
// This is not from any metric.
// It is basically an empty label name.
continue
}
labelNamesMap[name] = struct{}{}
}
}
labelNames := make([]string, 0, len(labelNamesMap))
for name := range labelNamesMap {
labelNames = append(labelNames, name)
}
sort.Strings(labelNames)
return labelNames, nil
}
type stringTuples struct {
length int // tuple length
entries []string // flattened tuple entries

View File

@ -117,11 +117,13 @@ func New(ls ...Label) Labels {
// FromMap returns new sorted Labels from the given map.
func FromMap(m map[string]string) Labels {
l := make([]Label, 0, len(m))
l := make(Labels, 0, len(m))
for k, v := range m {
l = append(l, Label{Name: k, Value: v})
}
return New(l...)
sort.Sort(l)
return l
}
// FromStrings creates new labels from pairs of strings.

View File

@ -33,10 +33,14 @@ type Querier interface {
// LabelValues returns all potential values for a label name.
LabelValues(string) ([]string, error)
// LabelValuesFor returns all potential values for a label name.
// under the constraint of another label.
LabelValuesFor(string, labels.Label) ([]string, error)
// LabelNames returns all the unique label names present in the block in sorted order.
LabelNames() ([]string, error)
// Close releases the resources of the Querier.
Close() error
}
@ -60,6 +64,28 @@ func (q *querier) LabelValues(n string) ([]string, error) {
return q.lvals(q.blocks, n)
}
// LabelNames returns all the unique label names present querier blocks.
func (q *querier) LabelNames() ([]string, error) {
labelNamesMap := make(map[string]struct{})
for _, b := range q.blocks {
names, err := b.LabelNames()
if err != nil {
return nil, errors.Wrap(err, "LabelNames() from Querier")
}
for _, name := range names {
labelNamesMap[name] = struct{}{}
}
}
labelNames := make([]string, 0, len(labelNamesMap))
for name := range labelNamesMap {
labelNames = append(labelNames, name)
}
sort.Strings(labelNames)
return labelNames, nil
}
func (q *querier) lvals(qs []Querier, n string) ([]string, error) {
if len(qs) == 0 {
return nil, nil
@ -187,6 +213,10 @@ func (q *blockQuerier) LabelValues(name string) ([]string, error) {
return res, nil
}
func (q *blockQuerier) LabelNames() ([]string, error) {
return q.index.LabelNames()
}
func (q *blockQuerier) LabelValuesFor(string, labels.Label) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
@ -249,7 +279,7 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error)
}
func postingsForMatcher(ix IndexReader, m labels.Matcher) (index.Postings, error) {
// If the matcher selects an empty value, it selects all the series which dont
// If the matcher selects an empty value, it selects all the series which don't
// have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575
// and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
if m.Matches("") {
@ -478,7 +508,7 @@ type baseChunkSeries struct {
// over them. It drops chunks based on tombstones in the given reader.
func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) {
if tr == nil {
tr = NewMemTombstones()
tr = newMemTombstones()
}
p, err := PostingsForMatchers(ir, ms...)
if err != nil {
@ -499,8 +529,8 @@ func (s *baseChunkSeries) Err() error { return s.err }
func (s *baseChunkSeries) Next() bool {
var (
lset labels.Labels
chkMetas []chunks.Meta
lset = make(labels.Labels, len(s.lset))
chkMetas = make([]chunks.Meta, len(s.chks))
err error
)

View File

@ -26,22 +26,16 @@ import (
type RecordType uint8
const (
RecordInvalid RecordType = 255
RecordSeries RecordType = 1
RecordSamples RecordType = 2
// RecordInvalid is returned for unrecognised WAL record types.
RecordInvalid RecordType = 255
// RecordSeries is used to match WAL records of type Series.
RecordSeries RecordType = 1
// RecordSamples is used to match WAL records of type Samples.
RecordSamples RecordType = 2
// RecordTombstones is used to match WAL records of type Tombstones.
RecordTombstones RecordType = 3
)
type RecordLogger interface {
Log(recs ...[]byte) error
}
type RecordReader interface {
Next() bool
Err() error
Record() []byte
}
// RecordDecoder decodes series, sample, and tombstone records.
// The zero value is ready to use.
type RecordDecoder struct {

View File

@ -1,3 +1,16 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (

View File

@ -29,7 +29,7 @@ const tombstoneFilename = "tombstones"
const (
// MagicTombstone is 4 bytes at the head of a tombstone file.
MagicTombstone = 0x130BA30
MagicTombstone = 0x0130BA30
tombstoneFormatV1 = 1
)
@ -113,10 +113,10 @@ type Stone struct {
intervals Intervals
}
func readTombstones(dir string) (*memTombstones, error) {
func readTombstones(dir string) (TombstoneReader, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
if os.IsNotExist(err) {
return NewMemTombstones(), nil
return newMemTombstones(), nil
} else if err != nil {
return nil, err
}
@ -146,7 +146,7 @@ func readTombstones(dir string) (*memTombstones, error) {
return nil, errors.New("checksum did not match")
}
stonesMap := NewMemTombstones()
stonesMap := newMemTombstones()
for d.len() > 0 {
k := d.uvarint64()
@ -167,7 +167,9 @@ type memTombstones struct {
mtx sync.RWMutex
}
func NewMemTombstones() *memTombstones {
// newMemTombstones creates new in memory TombstoneReader
// that allows adding new intervals.
func newMemTombstones() *memTombstones {
return &memTombstones{intvlGroups: make(map[uint64]Intervals)}
}
@ -208,7 +210,7 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) {
}
}
func (memTombstones) Close() error {
func (*memTombstones) Close() error {
return nil
}

View File

@ -46,6 +46,10 @@ const (
// before.
var castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
// page is an in memory buffer used to batch disk writes.
// Records bigger than the page size are split and flushed separately.
// A flush is triggered when a single records doesn't fit the page size or
// when the next record can't fit in the remaining free page space.
type page struct {
alloc int
flushed int
@ -92,8 +96,9 @@ func (e *CorruptionErr) Error() string {
}
// OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends.
func OpenWriteSegment(dir string, k int) (*Segment, error) {
f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_APPEND, 0666)
func OpenWriteSegment(logger log.Logger, dir string, k int) (*Segment, error) {
segName := SegmentName(dir, k)
f, err := os.OpenFile(segName, os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
@ -108,6 +113,7 @@ func OpenWriteSegment(dir string, k int) (*Segment, error) {
// If it was torn mid-record, a full read (which the caller should do anyway
// to ensure integrity) will detect it as a corruption by the end.
if d := stat.Size() % pageSize; d != 0 {
level.Warn(logger).Log("msg", "last page of the wal is torn, filling it with zeros", "segment", segName)
if _, err := f.Write(make([]byte, pageSize-d)); err != nil {
f.Close()
return nil, errors.Wrap(err, "zero-pad torn page")
@ -225,7 +231,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
return nil, err
}
} else {
if w.segment, err = OpenWriteSegment(w.dir, j); err != nil {
if w.segment, err = OpenWriteSegment(w.logger, w.dir, j); err != nil {
return nil, err
}
// Correctly initialize donePages.
@ -289,13 +295,13 @@ func (w *WAL) Repair(origErr error) error {
if err != nil {
return errors.Wrap(err, "list segments")
}
level.Warn(w.logger).Log("msg", "deleting all segments behind corruption")
level.Warn(w.logger).Log("msg", "deleting all segments behind corruption", "segment", cerr.Segment)
for _, s := range segs {
if s.n <= cerr.Segment {
if s.index <= cerr.Segment {
continue
}
if w.segment.i == s.n {
if w.segment.i == s.index {
// The active segment needs to be removed,
// close it first (Windows!). Can be closed safely
// as we set the current segment to repaired file
@ -304,14 +310,14 @@ func (w *WAL) Repair(origErr error) error {
return errors.Wrap(err, "close active segment")
}
}
if err := os.Remove(filepath.Join(w.dir, s.s)); err != nil {
return errors.Wrap(err, "delete segment")
if err := os.Remove(filepath.Join(w.dir, s.name)); err != nil {
return errors.Wrapf(err, "delete segment:%v", s.index)
}
}
// Regardless of the corruption offset, no record reaches into the previous segment.
// So we can safely repair the WAL by removing the segment and re-inserting all
// its records up to the corruption.
level.Warn(w.logger).Log("msg", "rewrite corrupted segment")
level.Warn(w.logger).Log("msg", "rewrite corrupted segment", "segment", cerr.Segment)
fn := SegmentName(w.dir, cerr.Segment)
tmpfn := fn + ".repair"
@ -397,7 +403,7 @@ func (w *WAL) flushPage(clear bool) error {
// No more data will fit into the page. Enqueue and clear it.
if clear {
p.alloc = pageSize // write till end of page
p.alloc = pageSize // Write till end of page.
w.pageCompletions.Inc()
}
n, err := w.segment.Write(p.buf[p.flushed:p.alloc])
@ -465,13 +471,14 @@ func (w *WAL) Log(recs ...[]byte) error {
}
// log writes rec to the log and forces a flush of the current page if its
// the final record of a batch.
// the final record of a batch, the record is bigger than the page size or
// the current page is full.
func (w *WAL) log(rec []byte, final bool) error {
// If the record is too big to fit within pages in the current
// If the record is too big to fit within the active page in the current
// segment, terminate the active segment and advance to the next one.
// This ensures that records do not cross segment boundaries.
left := w.page.remaining() - recordHeaderSize // Active pages.
left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // Free pages.
left := w.page.remaining() - recordHeaderSize // Free space in the active page.
left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // Free pages in the active segment.
if len(rec) > left {
if err := w.nextSegment(); err != nil {
@ -511,7 +518,9 @@ func (w *WAL) log(rec []byte, final bool) error {
copy(buf[recordHeaderSize:], part)
p.alloc += len(part) + recordHeaderSize
// If we wrote a full record, we can fit more records of the batch
// By definition when a record is split it means its size is bigger than
// the page boundary so the current page would be full and needs to be flushed.
// On contrary if we wrote a full record, we can fit more records of the batch
// into the page before flushing it.
if final || typ != recFull || w.page.full() {
if err := w.flushPage(false); err != nil {
@ -523,9 +532,9 @@ func (w *WAL) log(rec []byte, final bool) error {
return nil
}
// Segments returns the range [m, n] of currently existing segments.
// If no segments are found, m and n are -1.
func (w *WAL) Segments() (m, n int, err error) {
// Segments returns the range [first, n] of currently existing segments.
// If no segments are found, first and n are -1.
func (w *WAL) Segments() (first, last int, err error) {
refs, err := listSegments(w.dir)
if err != nil {
return 0, 0, err
@ -533,7 +542,7 @@ func (w *WAL) Segments() (m, n int, err error) {
if len(refs) == 0 {
return -1, -1, nil
}
return refs[0].n, refs[len(refs)-1].n, nil
return refs[0].index, refs[len(refs)-1].index, nil
}
// Truncate drops all segments before i.
@ -549,10 +558,10 @@ func (w *WAL) Truncate(i int) (err error) {
return err
}
for _, r := range refs {
if r.n >= i {
if r.index >= i {
break
}
if err = os.Remove(filepath.Join(w.dir, r.s)); err != nil {
if err = os.Remove(filepath.Join(w.dir, r.name)); err != nil {
return err
}
}
@ -595,8 +604,8 @@ func (w *WAL) Close() (err error) {
}
type segmentRef struct {
s string
n int
name string
index int
}
func listSegments(dir string) (refs []segmentRef, err error) {
@ -613,11 +622,11 @@ func listSegments(dir string) (refs []segmentRef, err error) {
if len(refs) > 0 && k > last+1 {
return nil, errors.New("segments are not sequential")
}
refs = append(refs, segmentRef{s: fn, n: k})
refs = append(refs, segmentRef{name: fn, index: k})
last = k
}
sort.Slice(refs, func(i, j int) bool {
return refs[i].n < refs[j].n
return refs[i].index < refs[j].index
})
return refs, nil
}
@ -628,8 +637,8 @@ func NewSegmentsReader(dir string) (io.ReadCloser, error) {
}
// NewSegmentsRangeReader returns a new reader over the given WAL segment range.
// If m or n are -1, the range is open on the respective end.
func NewSegmentsRangeReader(dir string, m, n int) (io.ReadCloser, error) {
// If first or last are -1, the range is open on the respective end.
func NewSegmentsRangeReader(dir string, first, last int) (io.ReadCloser, error) {
refs, err := listSegments(dir)
if err != nil {
return nil, err
@ -637,13 +646,13 @@ func NewSegmentsRangeReader(dir string, m, n int) (io.ReadCloser, error) {
var segs []*Segment
for _, r := range refs {
if m >= 0 && r.n < m {
if first >= 0 && r.index < first {
continue
}
if n >= 0 && r.n > n {
if last >= 0 && r.index > last {
break
}
s, err := OpenReadSegment(filepath.Join(dir, r.s))
s, err := OpenReadSegment(filepath.Join(dir, r.name))
if err != nil {
return nil, err
}
@ -745,6 +754,10 @@ func (r *Reader) next() (err error) {
// Gobble up zero bytes.
if typ == recPageTerm {
// recPageTerm is a single byte that indicates that the rest of the page is padded.
// If it's the first byte in a page, buf is too small and we have to resize buf to fit pageSize-1 bytes.
buf = r.buf[1:]
// We are pedantic and check whether the zeros are actually up
// to a page boundary.
// It's not strictly necessary but may catch sketchy state early.

2
vendor/modules.txt vendored
View File

@ -186,7 +186,7 @@ github.com/prometheus/common/route
github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg
# github.com/prometheus/procfs v0.0.0-20160411190841-abf152e5f3e9
github.com/prometheus/procfs
# github.com/prometheus/tsdb v0.0.0-20181003080831-0ce41118ed20
# github.com/prometheus/tsdb v0.2.0
github.com/prometheus/tsdb
github.com/prometheus/tsdb/labels
github.com/prometheus/tsdb/chunkenc

View File

@ -228,6 +228,8 @@ func (api *API) Register(r *route.Router) {
r.Get("/query_range", wrap(api.queryRange))
r.Post("/query_range", wrap(api.queryRange))
r.Get("/labels", wrap(api.labelNames))
r.Post("/labels", wrap(api.labelNames))
r.Get("/label/:name/values", wrap(api.labelValues))
r.Get("/series", wrap(api.series))
@ -390,6 +392,20 @@ func returnAPIError(err error) *apiError {
return &apiError{errorExec, err}
}
func (api *API) labelNames(r *http.Request) (interface{}, *apiError, func()) {
q, err := api.Queryable.Querier(r.Context(), math.MinInt64, math.MaxInt64)
if err != nil {
return nil, &apiError{errorExec, err}, nil
}
defer q.Close()
names, err := q.LabelNames()
if err != nil {
return nil, &apiError{errorExec, err}, nil
}
return names, nil, nil
}
func (api *API) labelValues(r *http.Request) (interface{}, *apiError, func()) {
ctx := r.Context()
name := route.Param(ctx, "name")

View File

@ -248,9 +248,9 @@ func TestEndpoints(t *testing.T) {
QueryEngine: suite.QueryEngine(),
targetRetriever: testTargetRetriever{},
alertmanagerRetriever: testAlertmanagerRetriever{},
flagsMap: sampleFlagMap,
now: func() time.Time { return now },
config: func() config.Config { return samplePrometheusCfg },
flagsMap: sampleFlagMap,
ready: func(f http.HandlerFunc) http.HandlerFunc { return f },
rulesRetriever: algr,
}
@ -301,15 +301,51 @@ func TestEndpoints(t *testing.T) {
QueryEngine: suite.QueryEngine(),
targetRetriever: testTargetRetriever{},
alertmanagerRetriever: testAlertmanagerRetriever{},
flagsMap: sampleFlagMap,
now: func() time.Time { return now },
config: func() config.Config { return samplePrometheusCfg },
flagsMap: sampleFlagMap,
ready: func(f http.HandlerFunc) http.HandlerFunc { return f },
rulesRetriever: algr,
}
testEndpoints(t, api, false)
})
}
func TestLabelNames(t *testing.T) {
// TestEndpoints doesn't have enough label names to test api.labelNames
// endpoint properly. Hence we test it separately.
suite, err := promql.NewTest(t, `
load 1m
test_metric1{foo1="bar", baz="abc"} 0+100x100
test_metric1{foo2="boo"} 1+0x100
test_metric2{foo="boo"} 1+0x100
test_metric2{foo="boo", xyz="qwerty"} 1+0x100
`)
testutil.Ok(t, err)
defer suite.Close()
testutil.Ok(t, suite.Run())
api := &API{
Queryable: suite.Storage(),
}
request := func(m string) (*http.Request, error) {
if m == http.MethodPost {
r, err := http.NewRequest(m, "http://example.com", nil)
r.Header.Set("Content-Type", "application/x-www-form-urlencoded")
return r, err
}
return http.NewRequest(m, "http://example.com", nil)
}
for _, method := range []string{http.MethodGet, http.MethodPost} {
ctx := context.Background()
req, err := request(method)
testutil.Ok(t, err)
resp, apiErr, _ := api.labelNames(req.WithContext(ctx))
assertAPIError(t, apiErr, "")
assertAPIResponse(t, resp, []string{"__name__", "baz", "foo", "foo1", "foo2", "xyz"})
}
}
func setupRemote(s storage.Storage) *httptest.Server {
@ -776,6 +812,11 @@ func testEndpoints(t *testing.T, api *API, testLabelAPI bool) {
},
errType: errorBadData,
},
// Label names.
{
endpoint: api.labelNames,
response: []string{"__name__", "foo"},
},
}...)
}