*: fully decouple tsdb, add new storage interfaces

pull/2643/head
Fabian Reinartz 2016-12-25 01:40:28 +01:00
parent 1becee3f6c
commit 0492ddbd4d
9 changed files with 602 additions and 310 deletions

61
pkg/labels/matcher.go Normal file
View File

@ -0,0 +1,61 @@
package labels
import (
"fmt"
"regexp"
)
// MatchType is an enum for label matching types.
type MatchType int
// Possible MatchTypes.
const (
MatchEqual MatchType = iota
MatchNotEqual
MatchRegexp
MatchNotRegexp
)
func (m MatchType) String() string {
typeToStr := map[MatchType]string{
MatchEqual: "=",
MatchNotEqual: "!=",
MatchRegexp: "=~",
MatchNotRegexp: "!~",
}
if str, ok := typeToStr[m]; ok {
return str
}
panic("unknown match type")
}
// Matcher models the matching of a label.
type Matcher struct {
Type MatchType
Name string
Value string
re *regexp.Regexp
}
// NewMatcher returns a matcher object.
func NewMatcher(t MatchType, n, v string) (*Matcher, error) {
m := &Matcher{
Type: t,
Name: n,
Value: v,
}
if t == MatchRegexp || t == MatchNotRegexp {
m.Value = "^(?:" + v + ")$"
re, err := regexp.Compile(m.Value)
if err != nil {
return nil, err
}
m.re = re
}
return m, nil
}
func (m *Matcher) String() string {
return fmt.Sprintf("%s%s%q", m.Name, m.Type, m.Value)
}

View File

@ -15,13 +15,10 @@ package promql
import (
"fmt"
"regexp"
"time"
"unsafe"
"github.com/fabxc/tsdb"
tsdbLabels "github.com/fabxc/tsdb/labels"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
)
// Node is a generic interface for all nodes in an AST.
@ -134,11 +131,11 @@ type MatrixSelector struct {
Name string
Range time.Duration
Offset time.Duration
LabelMatchers []*LabelMatcher
LabelMatchers []*labels.Matcher
// The series iterators are populated at query preparation time.
series []tsdb.Series
iterators []*tsdb.BufferedSeriesIterator
series []storage.Series
iterators []*storage.BufferedSeriesIterator
}
// NumberLiteral represents a number.
@ -168,11 +165,11 @@ type UnaryExpr struct {
type VectorSelector struct {
Name string
Offset time.Duration
LabelMatchers []*LabelMatcher
LabelMatchers []*labels.Matcher
// The series iterators are populated at query preparation time.
series []tsdb.Series
iterators []*tsdb.BufferedSeriesIterator
series []storage.Series
iterators []*storage.BufferedSeriesIterator
}
func (e *AggregateExpr) Type() ValueType { return ValueTypeVector }
@ -318,91 +315,3 @@ func (f inspector) Visit(node Node) Visitor {
func Inspect(node Node, f func(Node) bool) {
Walk(inspector(f), node)
}
// MatchType is an enum for label matching types.
type MatchType int
// Possible MatchTypes.
const (
MatchEqual MatchType = iota
MatchNotEqual
MatchRegexp
MatchNotRegexp
)
func (m MatchType) String() string {
typeToStr := map[MatchType]string{
MatchEqual: "=",
MatchNotEqual: "!=",
MatchRegexp: "=~",
MatchNotRegexp: "!~",
}
if str, ok := typeToStr[m]; ok {
return str
}
panic("unknown match type")
}
// LabelMatcher models the matching of a label.
type LabelMatcher struct {
Type MatchType
Name string
Value string
re *regexp.Regexp
}
// NewLabelMatcher returns a LabelMatcher object ready to use.
func NewLabelMatcher(t MatchType, n, v string) (*LabelMatcher, error) {
m := &LabelMatcher{
Type: t,
Name: n,
Value: v,
}
if t == MatchRegexp || t == MatchNotRegexp {
m.Value = "^(?:" + v + ")$"
re, err := regexp.Compile(m.Value)
if err != nil {
return nil, err
}
m.re = re
}
return m, nil
}
func toTSDBLabels(l labels.Labels) tsdbLabels.Labels {
return *(*tsdbLabels.Labels)(unsafe.Pointer(&l))
}
func toLabels(l tsdbLabels.Labels) labels.Labels {
return *(*labels.Labels)(unsafe.Pointer(&l))
}
func (m *LabelMatcher) String() string {
return fmt.Sprintf("%s%s%q", m.Name, m.Type, m.Value)
}
func (m *LabelMatcher) matcher() tsdbLabels.Matcher {
switch m.Type {
case MatchEqual:
return tsdbLabels.NewEqualMatcher(m.Name, m.Value)
case MatchNotEqual:
return tsdbLabels.Not(tsdbLabels.NewEqualMatcher(m.Name, m.Value))
case MatchRegexp:
res, err := tsdbLabels.NewRegexpMatcher(m.Name, m.Value)
if err != nil {
panic(err)
}
return res
case MatchNotRegexp:
res, err := tsdbLabels.NewRegexpMatcher(m.Name, m.Value)
if err != nil {
panic(err)
}
return tsdbLabels.Not(res)
}
panic("promql.LabelMatcher.matcher: invalid matcher type")
}

View File

@ -22,10 +22,9 @@ import (
"strings"
"time"
"github.com/fabxc/tsdb"
tsdbLabels "github.com/fabxc/tsdb/labels"
"github.com/prometheus/common/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/util/stats"
@ -291,7 +290,7 @@ type Engine struct {
// Queryable allows opening a storage querier.
type Queryable interface {
Querier(mint, maxt int64) (tsdb.Querier, error)
Querier(mint, maxt int64) (storage.Querier, error)
}
// NewEngine returns a new engine.
@ -526,7 +525,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
return resMatrix, nil
}
func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (tsdb.Querier, error) {
func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Querier, error) {
var maxOffset time.Duration
Inspect(s.Expr, func(node Node) bool {
@ -553,31 +552,22 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (tsdb.Quer
Inspect(s.Expr, func(node Node) bool {
switch n := node.(type) {
case *VectorSelector:
sel := make(tsdbLabels.Selector, 0, len(n.LabelMatchers))
for _, m := range n.LabelMatchers {
sel = append(sel, m.matcher())
}
n.series, err = expandSeriesSet(querier.Select(sel...))
n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...))
if err != nil {
return false
}
for _, s := range n.series {
it := tsdb.NewBuffer(s.Iterator(), durationMilliseconds(StalenessDelta))
it := storage.NewBuffer(s.Iterator(), durationMilliseconds(StalenessDelta))
n.iterators = append(n.iterators, it)
}
case *MatrixSelector:
sel := make(tsdbLabels.Selector, 0, len(n.LabelMatchers))
for _, m := range n.LabelMatchers {
sel = append(sel, m.matcher())
}
n.series, err = expandSeriesSet(querier.Select(sel...))
case *MatrixSelector:
n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...))
if err != nil {
return false
}
for _, s := range n.series {
it := tsdb.NewBuffer(s.Iterator(), durationMilliseconds(n.Range))
it := storage.NewBuffer(s.Iterator(), durationMilliseconds(n.Range))
n.iterators = append(n.iterators, it)
}
}
@ -586,7 +576,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (tsdb.Quer
return querier, err
}
func expandSeriesSet(it tsdb.SeriesSet) (res []tsdb.Series, err error) {
func expandSeriesSet(it storage.SeriesSet) (res []storage.Series, err error) {
for it.Next() {
res = append(res, it.Series())
}

190
storage/buffer.go Normal file
View File

@ -0,0 +1,190 @@
package storage
// NewBuffer returns a new iterator that buffers the values within the time range
// of the current element and the duration of delta before.
func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator {
return &BufferedSeriesIterator{
it: it,
buf: newSampleRing(delta, 16),
lastTime: math.MinInt64,
}
}
// PeekBack returns the previous element of the iterator. If there is none buffered,
// ok is false.
func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) {
return b.buf.last()
}
// Buffer returns an iterator over the buffered data.
func (b *BufferedSeriesIterator) Buffer() SeriesIterator {
return b.buf.iterator()
}
// Seek advances the iterator to the element at time t or greater.
func (b *BufferedSeriesIterator) Seek(t int64) bool {
t0 := t - b.buf.delta
// If the delta would cause us to seek backwards, preserve the buffer
// and just continue regular advancment while filling the buffer on the way.
if t0 > b.lastTime {
b.buf.reset()
ok := b.it.Seek(t0)
if !ok {
return false
}
b.lastTime, _ = b.Values()
}
if b.lastTime >= t {
return true
}
for b.Next() {
if b.lastTime >= t {
return true
}
}
return false
}
// Next advances the iterator to the next element.
func (b *BufferedSeriesIterator) Next() bool {
// Add current element to buffer before advancing.
b.buf.add(b.it.Values())
ok := b.it.Next()
if ok {
b.lastTime, _ = b.Values()
}
return ok
}
// Values returns the current element of the iterator.
func (b *BufferedSeriesIterator) Values() (int64, float64) {
return b.it.Values()
}
// Err returns the last encountered error.
func (b *BufferedSeriesIterator) Err() error {
return b.it.Err()
}
type sample struct {
t int64
v float64
}
type sampleRing struct {
delta int64
buf []sample // lookback buffer
i int // position of most recent element in ring buffer
f int // position of first element in ring buffer
l int // number of elements in buffer
}
func newSampleRing(delta int64, sz int) *sampleRing {
r := &sampleRing{delta: delta, buf: make([]sample, sz)}
r.reset()
return r
}
func (r *sampleRing) reset() {
r.l = 0
r.i = -1
r.f = 0
}
func (r *sampleRing) iterator() SeriesIterator {
return &sampleRingIterator{r: r, i: -1}
}
type sampleRingIterator struct {
r *sampleRing
i int
}
func (it *sampleRingIterator) Next() bool {
it.i++
return it.i < it.r.l
}
func (it *sampleRingIterator) Seek(int64) bool {
return false
}
func (it *sampleRingIterator) Err() error {
return nil
}
func (it *sampleRingIterator) Values() (int64, float64) {
return it.r.at(it.i)
}
func (r *sampleRing) at(i int) (int64, float64) {
j := (r.f + i) % len(r.buf)
s := r.buf[j]
return s.t, s.v
}
// add adds a sample to the ring buffer and frees all samples that fall
// out of the delta range.
func (r *sampleRing) add(t int64, v float64) {
l := len(r.buf)
// Grow the ring buffer if it fits no more elements.
if l == r.l {
buf := make([]sample, 2*l)
copy(buf[l+r.f:], r.buf[r.f:])
copy(buf, r.buf[:r.f])
r.buf = buf
r.i = r.f
r.f += l
} else {
r.i++
if r.i >= l {
r.i -= l
}
}
r.buf[r.i] = sample{t: t, v: v}
r.l++
// Free head of the buffer of samples that just fell out of the range.
for r.buf[r.f].t < t-r.delta {
r.f++
if r.f >= l {
r.f -= l
}
r.l--
}
}
// last returns the most recent element added to the ring.
func (r *sampleRing) last() (int64, float64, bool) {
if r.l == 0 {
return 0, 0, false
}
s := r.buf[r.i]
return s.t, s.v, true
}
func (r *sampleRing) samples() []sample {
res := make([]sample, r.l)
var k = r.f + r.l
var j int
if k > len(r.buf) {
k = len(r.buf)
j = r.l - k + r.f
}
n := copy(res, r.buf[r.f:k])
copy(res[n:], r.buf[:j])
return res
}

123
storage/buffer_test.go Normal file
View File

@ -0,0 +1,123 @@
package storage
import (
"math/rand"
"testing"
"github.com/stretchr/testify/require"
)
func TestSampleRing(t *testing.T) {
cases := []struct {
input []int64
delta int64
size int
}{
{
input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
delta: 2,
size: 1,
},
{
input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
delta: 2,
size: 2,
},
{
input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
delta: 7,
size: 3,
},
{
input: []int64{1, 2, 3, 4, 5, 16, 17, 18, 19, 20},
delta: 7,
size: 1,
},
}
for _, c := range cases {
r := newSampleRing(c.delta, c.size)
input := []sample{}
for _, t := range c.input {
input = append(input, sample{
t: t,
v: float64(rand.Intn(100)),
})
}
for i, s := range input {
r.add(s.t, s.v)
buffered := r.samples()
for _, sold := range input[:i] {
found := false
for _, bs := range buffered {
if bs.t == sold.t && bs.v == sold.v {
found = true
break
}
}
if sold.t >= s.t-c.delta && !found {
t.Fatalf("%d: expected sample %d to be in buffer but was not; buffer %v", i, sold.t, buffered)
}
if sold.t < s.t-c.delta && found {
t.Fatalf("%d: unexpected sample %d in buffer; buffer %v", i, sold.t, buffered)
}
}
}
}
}
func TestBufferedSeriesIterator(t *testing.T) {
var it *BufferedSeriesIterator
bufferEq := func(exp []sample) {
var b []sample
bit := it.Buffer()
for bit.Next() {
t, v := bit.Values()
b = append(b, sample{t: t, v: v})
}
require.Equal(t, exp, b, "buffer mismatch")
}
sampleEq := func(ets int64, ev float64) {
ts, v := it.Values()
require.Equal(t, ets, ts, "timestamp mismatch")
require.Equal(t, ev, v, "value mismatch")
}
it = NewBuffer(newListSeriesIterator([]sample{
{t: 1, v: 2},
{t: 2, v: 3},
{t: 3, v: 4},
{t: 4, v: 5},
{t: 5, v: 6},
{t: 99, v: 8},
{t: 100, v: 9},
{t: 101, v: 10},
}), 2)
require.True(t, it.Seek(-123), "seek failed")
sampleEq(1, 2)
bufferEq(nil)
require.True(t, it.Next(), "next failed")
sampleEq(2, 3)
bufferEq([]sample{{t: 1, v: 2}})
require.True(t, it.Next(), "next failed")
require.True(t, it.Next(), "next failed")
require.True(t, it.Next(), "next failed")
sampleEq(5, 6)
bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}})
require.True(t, it.Seek(5), "seek failed")
sampleEq(5, 6)
bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}})
require.True(t, it.Seek(101), "seek failed")
sampleEq(101, 10)
bufferEq([]sample{{t: 99, v: 8}, {t: 100, v: 9}})
require.False(t, it.Next(), "next succeeded unexpectedly")
}

93
storage/interface.go Normal file
View File

@ -0,0 +1,93 @@
// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/util/testutil"
)
var (
ErrOutOfOrderSample = errors.New("out of order sample")
ErrDuplicateSampleForTimestamp = errors.New("duplicate sample for timestamp")
)
func NewTestStorage(t testutil.T) Storage {
return nil, nil
}
// Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. Storage implements storage.SampleAppender.
type Storage interface {
// Querier returns a new Querier on the storage.
Querier(mint, maxt int64) (Querier, error)
// Appender returns a new appender against the storage.
Appender() (Appender, error)
// Close closes the storage and all its underlying resources.
Close() error
}
// Querier provides reading access to time series data.
type Querier interface {
// Select returns a set of series that matches the given label matchers.
Select(...*labels.Matcher) (SeriesSet, error)
// LabelValues returns all potential values for a label name.
LabelValues(name string) ([]string, error)
// Close releases the resources of the Querier.
Close() error
}
// Appender provides batched appends against a storage.
type Appender interface {
// Add adds a sample pair for the referenced series.
Add(lset labels.Labels, t int64, v float64)
// Commit submits the collected samples and purges the batch.
Commit() error
}
// SeriesSet contains a set of series.
type SeriesSet interface {
Next() bool
Series() Series
Err() error
}
// Series represents a single time series.
type Series interface {
// Labels returns the complete set of labels identifying the series.
Labels() labels.Labels
// Iterator returns a new iterator of the data of the series.
Iterator() SeriesIterator
}
// SeriesIterator iterates over the data of a time series.
type SeriesIterator interface {
// Seek advances the iterator forward to the value at or after
// the given timestamp.
Seek(t int64) bool
// Values returns the current timestamp/value pair.
Values() (t int64, v float64)
// Next advances the iterator by one.
Next() bool
// Err returns the current error.
Err() error
}

View File

@ -1,117 +0,0 @@
// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
"errors"
"time"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/util/testutil"
)
var (
ErrOutOfOrderSample = errors.New("out of order sample")
ErrDuplicateSampleForTimestamp = errors.New("duplicate sample for timestamp")
)
func NewTestStorage(t testutil.T, enc byte) (Storage, testutil.Closer) {
return nil, nil
}
// Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. Storage implements storage.SampleAppender.
type Storage interface {
// Querier returns a new Querier on the storage.
Querier() (Querier, error)
// This SampleAppender needs multiple samples for the same fingerprint to be
// submitted in chronological order, from oldest to newest. When Append has
// returned, the appended sample might not be queryable immediately. (Use
// WaitForIndexing to wait for complete processing.) The implementation might
// remove labels with empty value from the provided Sample as those labels
// are considered equivalent to a label not present at all.
//
// Appending is throttled if the Storage has too many chunks in memory
// already or has too many chunks waiting for persistence.
storage.SampleAppender
// Drop all time series associated with the given label matchers. Returns
// the number series that were dropped.
DropMetricsForLabelMatchers(context.Context, ...*metric.LabelMatcher) (int, error)
// Run the various maintenance loops in goroutines. Returns when the
// storage is ready to use. Keeps everything running in the background
// until Stop is called.
Start() error
// Stop shuts down the Storage gracefully, flushes all pending
// operations, stops all maintenance loops,and frees all resources.
Stop() error
// WaitForIndexing returns once all samples in the storage are
// indexed. Indexing is needed for FingerprintsForLabelMatchers and
// LabelValuesForLabelName and may lag behind.
WaitForIndexing()
}
// Querier allows querying a time series storage.
type Querier interface {
// Close closes the querier. Behavior for subsequent calls to Querier methods
// is undefined.
Close() error
// QueryRange returns a list of series iterators for the selected
// time range and label matchers. The iterators need to be closed
// after usage.
QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error)
// QueryInstant returns a list of series iterators for the selected
// instant and label matchers. The iterators need to be closed after usage.
QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error)
// MetricsForLabelMatchers returns the metrics from storage that satisfy
// the given sets of label matchers. Each set of matchers must contain at
// least one label matcher that does not match the empty string. Otherwise,
// an empty list is returned. Within one set of matchers, the intersection
// of matching series is computed. The final return value will be the union
// of the per-set results. The times from and through are hints for the
// storage to optimize the search. The storage MAY exclude metrics that
// have no samples in the specified interval from the returned map. In
// doubt, specify model.Earliest for from and model.Latest for through.
MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error)
// LastSampleForLabelMatchers returns the last samples that have been
// ingested for the time series matching the given set of label matchers.
// The label matching behavior is the same as in MetricsForLabelMatchers.
// All returned samples are between the specified cutoff time and now.
LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error)
// Get all of the label values that are associated with a given label name.
LabelValuesForLabelName(context.Context, model.LabelName) (model.LabelValues, error)
}
// SeriesIterator enables efficient access of sample values in a series. Its
// methods are not goroutine-safe. A SeriesIterator iterates over a snapshot of
// a series, i.e. it is safe to continue using a SeriesIterator after or during
// modifying the corresponding series, but the iterator will represent the state
// of the series prior to the modification.
type SeriesIterator interface {
// Gets the value that is closest before the given time. In case a value
// exists at precisely the given time, that value is returned. If no
// applicable value exists, model.ZeroSamplePair is returned.
ValueAtOrBeforeTime(model.Time) model.SamplePair
// Gets all values contained within a given interval.
RangeValues(metric.Interval) []model.SamplePair
// Returns the metric of the series that the iterator corresponds to.
Metric() metric.Metric
// Closes the iterator and releases the underlying data.
Close()
}

View File

@ -1,76 +0,0 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"github.com/prometheus/common/model"
)
// SampleAppender is the interface to append samples to both, local and remote
// storage. All methods are goroutine-safe.
type SampleAppender interface {
// Append appends a sample to the underlying storage. Depending on the
// storage implementation, there are different guarantees for the fate
// of the sample after Append has returned. Remote storage
// implementation will simply drop samples if they cannot keep up with
// sending samples. Local storage implementations will only drop metrics
// upon unrecoverable errors.
Append(*model.Sample) error
// NeedsThrottling returns true if the underlying storage wishes to not
// receive any more samples. Append will still work but might lead to
// undue resource usage. It is recommended to call NeedsThrottling once
// before an upcoming batch of Append calls (e.g. a full scrape of a
// target or the evaluation of a rule group) and only proceed with the
// batch if NeedsThrottling returns false. In that way, the result of a
// scrape or of an evaluation of a rule group will always be appended
// completely or not at all, and the work of scraping or evaluation will
// not be performed in vain. Also, a call of NeedsThrottling is
// potentially expensive, so limiting the number of calls is reasonable.
//
// Only SampleAppenders for which it is considered critical to receive
// each and every sample should ever return true. SampleAppenders that
// tolerate not receiving all samples should always return false and
// instead drop samples as they see fit to avoid overload.
NeedsThrottling() bool
}
// Fanout is a SampleAppender that appends every sample to each SampleAppender
// in its list.
type Fanout []SampleAppender
// Append implements SampleAppender. It appends the provided sample to all
// SampleAppenders in the Fanout slice and waits for each append to complete
// before proceeding with the next.
// If any of the SampleAppenders returns an error, the first one is returned
// at the end.
func (f Fanout) Append(s *model.Sample) error {
var err error
for _, a := range f {
if e := a.Append(s); e != nil && err == nil {
err = e
}
}
return err
}
// NeedsThrottling returns true if at least one of the SampleAppenders in the
// Fanout slice is throttled.
func (f Fanout) NeedsThrottling() bool {
for _, a := range f {
if a.NeedsThrottling() {
return true
}
}
return false
}

119
storage/tsdb/tsdb.go Normal file
View File

@ -0,0 +1,119 @@
package tsdb
import (
"unsafe"
"github.com/fabxc/tsdb"
tsdbLabels "github.com/fabxc/tsdb/labels"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
)
type storage struct {
db *tsdb.DB
}
// Open returns a new storage backed by a tsdb database.
func Open(path string) (storage.Storage, error) {
db, err := tsdb.Open(path, nil, nil)
if err != nil {
return nil, err
}
return &storage{db: db}
}
func (db *storage) Querier(mint, maxt int64) (storage.Querier, error) {
q, err := db.db.Querier(mint, maxt)
if err != nil {
return nil, err
}
return querier{q: q}, nil
}
// Appender returns a new appender against the storage.
func (db *storage) Appender() (Appender, error) {
a, err := db.db.Appender()
if err != nil {
return nil, err
}
return appender{a: a}, nil
}
// Close closes the storage and all its underlying resources.
func (db *storage) Close() error {
return db.Close()
}
type querier struct {
q tsdb.Querier
}
func (q *querier) Select(oms ...*promql.LabelMatcher) (storage.SeriesSet, error) {
ms := make([]tsdbLabels.Matcher, 0, len(oms))
for _, om := range oms {
ms = append(ms, convertMatcher(om))
}
set := q.q.Select(ms...)
return seriesSet{set: set}
}
func (q *querier) LabelValues(name string) ([]string, error) { return q.q.LabelValues(name) }
func (q *querier) Close() error { return q.q.Close() }
type seriesSet struct {
set tsdb.SeriesSet
}
func (s *seriesSet) Next() bool { return s.set.Next() }
func (s *seriesSet) Err() error { return s.set.Err() }
func (s *seriesSet) Series() storage.Series { return series{s: s.set.Series()} }
type series struct {
s tsdb.Series
}
func (s *series) Labels() labels.Labels { return toLabels(s.s.Labels()) }
func (s *series) Iterator() storage.SeriesIterator { return storage.SeriesIterator(s.s.Iterator()) }
type appender struct {
a tsdb.Appender
}
func (a *appender) Add(lset labels.Labels, t int64, v float64) { a.Add(toTSDBLabels(lset), t, v) }
func (a *appender) Commit() error { a.a.Commit() }
func convertMatcher(m *promql.LabelMatcher) tsdbLabels.Matcher {
switch m.Type {
case MatchEqual:
return tsdbLabels.NewEqualMatcher(m.Name, m.Value)
case MatchNotEqual:
return tsdbLabels.Not(tsdbLabels.NewEqualMatcher(m.Name, m.Value))
case MatchRegexp:
res, err := tsdbLabels.NewRegexpMatcher(m.Name, m.Value)
if err != nil {
panic(err)
}
return res
case MatchNotRegexp:
res, err := tsdbLabels.NewRegexpMatcher(m.Name, m.Value)
if err != nil {
panic(err)
}
return tsdbLabels.Not(res)
}
panic("promql.LabelMatcher.matcher: invalid matcher type")
}
func toTSDBLabels(l labels.Labels) tsdbLabels.Labels {
return *(*tsdbLabels.Labels)(unsafe.Pointer(&l))
}
func toLabels(l tsdbLabels.Labels) labels.Labels {
return *(*labels.Labels)(unsafe.Pointer(&l))
}