Merge pull request #2850 from tomwilkie/dev-2.0-remote

Remote APIs for v2
pull/3021/head
Fabian Reinartz 2017-08-03 13:39:09 +02:00 committed by GitHub
commit 4d3d8ee229
29 changed files with 3854 additions and 64 deletions

View File

@ -40,6 +40,8 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/tsdb"
"github.com/prometheus/prometheus/web"
)
@ -221,20 +223,20 @@ func main() {
}
logger.Infoln("tsdb started")
// remoteStorage := &remote.Storage{}
// sampleAppender = append(sampleAppender, remoteStorage)
// reloadables = append(reloadables, remoteStorage)
remoteStorage := &remote.Storage{}
reloadables = append(reloadables, remoteStorage)
fanoutStorage := storage.NewFanout(tsdb.Adapter(localStorage), remoteStorage)
cfg.queryEngine.Logger = logger
var (
notifier = notifier.New(&cfg.notifier, logger)
targetManager = retrieval.NewTargetManager(tsdb.Adapter(localStorage), logger)
queryEngine = promql.NewEngine(tsdb.Adapter(localStorage), &cfg.queryEngine)
targetManager = retrieval.NewTargetManager(fanoutStorage, logger)
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ctx, cancelCtx = context.WithCancel(context.Background())
)
ruleManager := rules.NewManager(&rules.ManagerOptions{
Appendable: tsdb.Adapter(localStorage),
Appendable: fanoutStorage,
Notifier: notifier,
QueryEngine: queryEngine,
Context: ctx,
@ -296,8 +298,8 @@ func main() {
// Start all components. The order is NOT arbitrary.
defer func() {
if err := localStorage.Close(); err != nil {
logger.Errorln("Error stopping storage:", err)
if err := fanoutStorage.Close(); err != nil {
log.Errorln("Error stopping storage:", err)
}
}()

View File

@ -22,7 +22,7 @@ import (
"github.com/golang/snappy"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/prompb"
)
func main() {
@ -39,7 +39,7 @@ func main() {
return
}
var req remote.WriteRequest
var req prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
@ -53,7 +53,7 @@ func main() {
fmt.Println(m)
for _, s := range ts.Samples {
fmt.Printf(" %f %d\n", s.Value, s.TimestampMs)
fmt.Printf(" %f %d\n", s.Value, s.Timestamp)
}
}
})

View File

@ -22,7 +22,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/prompb"
influx "github.com/influxdata/influxdb/client/v2"
)
@ -101,8 +101,8 @@ func (c *Client) Write(samples model.Samples) error {
return c.client.Write(bps)
}
func (c *Client) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) {
labelsToSeries := map[string]*remote.TimeSeries{}
func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
labelsToSeries := map[string]*prompb.TimeSeries{}
for _, q := range req.Queries {
command, err := c.buildCommand(q)
if err != nil {
@ -123,9 +123,9 @@ func (c *Client) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) {
}
}
resp := remote.ReadResponse{
Results: []*remote.QueryResult{
{Timeseries: make([]*remote.TimeSeries, 0, len(labelsToSeries))},
resp := prompb.ReadResponse{
Results: []*prompb.QueryResult{
{Timeseries: make([]*prompb.TimeSeries, 0, len(labelsToSeries))},
},
}
for _, ts := range labelsToSeries {
@ -134,7 +134,7 @@ func (c *Client) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) {
return &resp, nil
}
func (c *Client) buildCommand(q *remote.Query) (string, error) {
func (c *Client) buildCommand(q *prompb.Query) (string, error) {
matchers := make([]string, 0, len(q.Matchers))
// If we don't find a metric name matcher, query all metrics
// (InfluxDB measurements) by default.
@ -142,9 +142,9 @@ func (c *Client) buildCommand(q *remote.Query) (string, error) {
for _, m := range q.Matchers {
if m.Name == model.MetricNameLabel {
switch m.Type {
case remote.MatchType_EQUAL:
case prompb.LabelMatcher_EQ:
from = fmt.Sprintf("FROM %q.%q", c.retentionPolicy, m.Value)
case remote.MatchType_REGEX_MATCH:
case prompb.LabelMatcher_RE:
from = fmt.Sprintf("FROM %q./^%s$/", c.retentionPolicy, escapeSlashes(m.Value))
default:
// TODO: Figure out how to support these efficiently.
@ -154,13 +154,13 @@ func (c *Client) buildCommand(q *remote.Query) (string, error) {
}
switch m.Type {
case remote.MatchType_EQUAL:
case prompb.LabelMatcher_EQ:
matchers = append(matchers, fmt.Sprintf("%q = '%s'", m.Name, escapeSingleQuotes(m.Value)))
case remote.MatchType_NOT_EQUAL:
case prompb.LabelMatcher_NEQ:
matchers = append(matchers, fmt.Sprintf("%q != '%s'", m.Name, escapeSingleQuotes(m.Value)))
case remote.MatchType_REGEX_MATCH:
case prompb.LabelMatcher_RE:
matchers = append(matchers, fmt.Sprintf("%q =~ /^%s$/", m.Name, escapeSlashes(m.Value)))
case remote.MatchType_REGEX_NO_MATCH:
case prompb.LabelMatcher_NRE:
matchers = append(matchers, fmt.Sprintf("%q !~ /^%s$/", m.Name, escapeSlashes(m.Value)))
default:
return "", fmt.Errorf("unknown match type %v", m.Type)
@ -180,13 +180,13 @@ func escapeSlashes(str string) string {
return strings.Replace(str, `/`, `\/`, -1)
}
func mergeResult(labelsToSeries map[string]*remote.TimeSeries, results []influx.Result) error {
func mergeResult(labelsToSeries map[string]*prompb.TimeSeries, results []influx.Result) error {
for _, r := range results {
for _, s := range r.Series {
k := concatLabels(s.Tags)
ts, ok := labelsToSeries[k]
if !ok {
ts = &remote.TimeSeries{
ts = &prompb.TimeSeries{
Labels: tagsToLabelPairs(s.Name, s.Tags),
}
labelsToSeries[k] = ts
@ -214,8 +214,8 @@ func concatLabels(labels map[string]string) string {
return strings.Join(pairs, separator)
}
func tagsToLabelPairs(name string, tags map[string]string) []*remote.LabelPair {
pairs := make([]*remote.LabelPair, 0, len(tags))
func tagsToLabelPairs(name string, tags map[string]string) []*prompb.Label {
pairs := make([]*prompb.Label, 0, len(tags))
for k, v := range tags {
if v == "" {
// If we select metrics with different sets of labels names,
@ -226,20 +226,20 @@ func tagsToLabelPairs(name string, tags map[string]string) []*remote.LabelPair {
// to make the result correct.
continue
}
pairs = append(pairs, &remote.LabelPair{
pairs = append(pairs, &prompb.Label{
Name: k,
Value: v,
})
}
pairs = append(pairs, &remote.LabelPair{
pairs = append(pairs, &prompb.Label{
Name: model.MetricNameLabel,
Value: name,
})
return pairs
}
func valuesToSamples(values [][]interface{}) ([]*remote.Sample, error) {
samples := make([]*remote.Sample, 0, len(values))
func valuesToSamples(values [][]interface{}) ([]*prompb.Sample, error) {
samples := make([]*prompb.Sample, 0, len(values))
for _, v := range values {
if len(v) != 2 {
return nil, fmt.Errorf("bad sample tuple length, expected [<timestamp>, <value>], got %v", v)
@ -265,9 +265,9 @@ func valuesToSamples(values [][]interface{}) ([]*remote.Sample, error) {
return nil, fmt.Errorf("unable to convert sample value to float64: %v", err)
}
samples = append(samples, &remote.Sample{
TimestampMs: timestamp,
Value: value,
samples = append(samples, &prompb.Sample{
Timestamp: timestamp,
Value: value,
})
}
return samples, nil
@ -275,14 +275,14 @@ func valuesToSamples(values [][]interface{}) ([]*remote.Sample, error) {
// mergeSamples merges two lists of sample pairs and removes duplicate
// timestamps. It assumes that both lists are sorted by timestamp.
func mergeSamples(a, b []*remote.Sample) []*remote.Sample {
result := make([]*remote.Sample, 0, len(a)+len(b))
func mergeSamples(a, b []*prompb.Sample) []*prompb.Sample {
result := make([]*prompb.Sample, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i].TimestampMs < b[j].TimestampMs {
if a[i].Timestamp < b[j].Timestamp {
result = append(result, a[i])
i++
} else if a[i].TimestampMs > b[j].TimestampMs {
} else if a[i].Timestamp > b[j].Timestamp {
result = append(result, b[j])
j++
} else {

View File

@ -36,7 +36,7 @@ import (
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/graphite"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/influxdb"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/opentsdb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/prompb"
)
type config struct {
@ -146,7 +146,7 @@ type writer interface {
}
type reader interface {
Read(req *remote.ReadRequest) (*remote.ReadResponse, error)
Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error)
Name() string
}
@ -196,7 +196,7 @@ func serve(addr string, writers []writer, readers []reader) error {
return
}
var req remote.WriteRequest
var req prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
@ -229,7 +229,7 @@ func serve(addr string, writers []writer, readers []reader) error {
return
}
var req remote.ReadRequest
var req prompb.ReadRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
@ -242,7 +242,7 @@ func serve(addr string, writers []writer, readers []reader) error {
}
reader := readers[0]
var resp *remote.ReadResponse
var resp *prompb.ReadResponse
resp, err = reader.Read(&req)
if err != nil {
log.With("query", req).With("storage", reader.Name()).With("err", err).Warnf("Error executing query")
@ -269,7 +269,7 @@ func serve(addr string, writers []writer, readers []reader) error {
return http.ListenAndServe(addr, nil)
}
func protoToSamples(req *remote.WriteRequest) model.Samples {
func protoToSamples(req *prompb.WriteRequest) model.Samples {
var samples model.Samples
for _, ts := range req.Timeseries {
metric := make(model.Metric, len(ts.Labels))
@ -281,7 +281,7 @@ func protoToSamples(req *remote.WriteRequest) model.Samples {
samples = append(samples, &model.Sample{
Metric: metric,
Value: model.SampleValue(s.Value),
Timestamp: model.Time(s.TimestampMs),
Timestamp: model.Time(s.Timestamp),
})
}
}

987
prompb/remote.pb.go Normal file
View File

@ -0,0 +1,987 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: remote.proto
/*
Package prompb is a generated protocol buffer package.
It is generated from these files:
remote.proto
rpc.proto
types.proto
It has these top-level messages:
WriteRequest
ReadRequest
ReadResponse
Query
QueryResult
TSDBSnapshotRequest
TSDBSnapshotResponse
SeriesDeleteRequest
SeriesDeleteResponse
Sample
TimeSeries
Label
Labels
LabelMatcher
*/
package prompb
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
import io "io"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type WriteRequest struct {
Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"`
}
func (m *WriteRequest) Reset() { *m = WriteRequest{} }
func (m *WriteRequest) String() string { return proto.CompactTextString(m) }
func (*WriteRequest) ProtoMessage() {}
func (*WriteRequest) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{0} }
func (m *WriteRequest) GetTimeseries() []*TimeSeries {
if m != nil {
return m.Timeseries
}
return nil
}
type ReadRequest struct {
Queries []*Query `protobuf:"bytes,1,rep,name=queries" json:"queries,omitempty"`
}
func (m *ReadRequest) Reset() { *m = ReadRequest{} }
func (m *ReadRequest) String() string { return proto.CompactTextString(m) }
func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{1} }
func (m *ReadRequest) GetQueries() []*Query {
if m != nil {
return m.Queries
}
return nil
}
type ReadResponse struct {
// In same order as the request's queries.
Results []*QueryResult `protobuf:"bytes,1,rep,name=results" json:"results,omitempty"`
}
func (m *ReadResponse) Reset() { *m = ReadResponse{} }
func (m *ReadResponse) String() string { return proto.CompactTextString(m) }
func (*ReadResponse) ProtoMessage() {}
func (*ReadResponse) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{2} }
func (m *ReadResponse) GetResults() []*QueryResult {
if m != nil {
return m.Results
}
return nil
}
type Query struct {
StartTimestampMs int64 `protobuf:"varint,1,opt,name=start_timestamp_ms,json=startTimestampMs,proto3" json:"start_timestamp_ms,omitempty"`
EndTimestampMs int64 `protobuf:"varint,2,opt,name=end_timestamp_ms,json=endTimestampMs,proto3" json:"end_timestamp_ms,omitempty"`
Matchers []*LabelMatcher `protobuf:"bytes,3,rep,name=matchers" json:"matchers,omitempty"`
}
func (m *Query) Reset() { *m = Query{} }
func (m *Query) String() string { return proto.CompactTextString(m) }
func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{3} }
func (m *Query) GetStartTimestampMs() int64 {
if m != nil {
return m.StartTimestampMs
}
return 0
}
func (m *Query) GetEndTimestampMs() int64 {
if m != nil {
return m.EndTimestampMs
}
return 0
}
func (m *Query) GetMatchers() []*LabelMatcher {
if m != nil {
return m.Matchers
}
return nil
}
type QueryResult struct {
Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"`
}
func (m *QueryResult) Reset() { *m = QueryResult{} }
func (m *QueryResult) String() string { return proto.CompactTextString(m) }
func (*QueryResult) ProtoMessage() {}
func (*QueryResult) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{4} }
func (m *QueryResult) GetTimeseries() []*TimeSeries {
if m != nil {
return m.Timeseries
}
return nil
}
func init() {
proto.RegisterType((*WriteRequest)(nil), "prometheus.WriteRequest")
proto.RegisterType((*ReadRequest)(nil), "prometheus.ReadRequest")
proto.RegisterType((*ReadResponse)(nil), "prometheus.ReadResponse")
proto.RegisterType((*Query)(nil), "prometheus.Query")
proto.RegisterType((*QueryResult)(nil), "prometheus.QueryResult")
}
func (m *WriteRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *WriteRequest) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.Timeseries) > 0 {
for _, msg := range m.Timeseries {
dAtA[i] = 0xa
i++
i = encodeVarintRemote(dAtA, i, uint64(msg.Size()))
n, err := msg.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n
}
}
return i, nil
}
func (m *ReadRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.Queries) > 0 {
for _, msg := range m.Queries {
dAtA[i] = 0xa
i++
i = encodeVarintRemote(dAtA, i, uint64(msg.Size()))
n, err := msg.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n
}
}
return i, nil
}
func (m *ReadResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ReadResponse) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.Results) > 0 {
for _, msg := range m.Results {
dAtA[i] = 0xa
i++
i = encodeVarintRemote(dAtA, i, uint64(msg.Size()))
n, err := msg.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n
}
}
return i, nil
}
func (m *Query) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Query) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.StartTimestampMs != 0 {
dAtA[i] = 0x8
i++
i = encodeVarintRemote(dAtA, i, uint64(m.StartTimestampMs))
}
if m.EndTimestampMs != 0 {
dAtA[i] = 0x10
i++
i = encodeVarintRemote(dAtA, i, uint64(m.EndTimestampMs))
}
if len(m.Matchers) > 0 {
for _, msg := range m.Matchers {
dAtA[i] = 0x1a
i++
i = encodeVarintRemote(dAtA, i, uint64(msg.Size()))
n, err := msg.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n
}
}
return i, nil
}
func (m *QueryResult) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *QueryResult) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.Timeseries) > 0 {
for _, msg := range m.Timeseries {
dAtA[i] = 0xa
i++
i = encodeVarintRemote(dAtA, i, uint64(msg.Size()))
n, err := msg.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n
}
}
return i, nil
}
func encodeFixed64Remote(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
dAtA[offset+1] = uint8(v >> 8)
dAtA[offset+2] = uint8(v >> 16)
dAtA[offset+3] = uint8(v >> 24)
dAtA[offset+4] = uint8(v >> 32)
dAtA[offset+5] = uint8(v >> 40)
dAtA[offset+6] = uint8(v >> 48)
dAtA[offset+7] = uint8(v >> 56)
return offset + 8
}
func encodeFixed32Remote(dAtA []byte, offset int, v uint32) int {
dAtA[offset] = uint8(v)
dAtA[offset+1] = uint8(v >> 8)
dAtA[offset+2] = uint8(v >> 16)
dAtA[offset+3] = uint8(v >> 24)
return offset + 4
}
func encodeVarintRemote(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return offset + 1
}
func (m *WriteRequest) Size() (n int) {
var l int
_ = l
if len(m.Timeseries) > 0 {
for _, e := range m.Timeseries {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
return n
}
func (m *ReadRequest) Size() (n int) {
var l int
_ = l
if len(m.Queries) > 0 {
for _, e := range m.Queries {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
return n
}
func (m *ReadResponse) Size() (n int) {
var l int
_ = l
if len(m.Results) > 0 {
for _, e := range m.Results {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
return n
}
func (m *Query) Size() (n int) {
var l int
_ = l
if m.StartTimestampMs != 0 {
n += 1 + sovRemote(uint64(m.StartTimestampMs))
}
if m.EndTimestampMs != 0 {
n += 1 + sovRemote(uint64(m.EndTimestampMs))
}
if len(m.Matchers) > 0 {
for _, e := range m.Matchers {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
return n
}
func (m *QueryResult) Size() (n int) {
var l int
_ = l
if len(m.Timeseries) > 0 {
for _, e := range m.Timeseries {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
return n
}
func sovRemote(x uint64) (n int) {
for {
n++
x >>= 7
if x == 0 {
break
}
}
return n
}
func sozRemote(x uint64) (n int) {
return sovRemote(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *WriteRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: WriteRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: WriteRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Timeseries = append(m.Timeseries, &TimeSeries{})
if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthRemote
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *ReadRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: ReadRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: ReadRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Queries", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Queries = append(m.Queries, &Query{})
if err := m.Queries[len(m.Queries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthRemote
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *ReadResponse) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: ReadResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: ReadResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Results", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Results = append(m.Results, &QueryResult{})
if err := m.Results[len(m.Results)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthRemote
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Query) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Query: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Query: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field StartTimestampMs", wireType)
}
m.StartTimestampMs = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.StartTimestampMs |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field EndTimestampMs", wireType)
}
m.EndTimestampMs = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.EndTimestampMs |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Matchers", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Matchers = append(m.Matchers, &LabelMatcher{})
if err := m.Matchers[len(m.Matchers)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthRemote
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *QueryResult) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: QueryResult: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: QueryResult: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Timeseries = append(m.Timeseries, &TimeSeries{})
if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthRemote
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipRemote(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowRemote
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowRemote
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowRemote
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
iNdEx += length
if length < 0 {
return 0, ErrInvalidLengthRemote
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowRemote
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipRemote(dAtA[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
}
return iNdEx, nil
case 4:
return iNdEx, nil
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
}
panic("unreachable")
}
var (
ErrInvalidLengthRemote = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowRemote = fmt.Errorf("proto: integer overflow")
)
func init() { proto.RegisterFile("remote.proto", fileDescriptorRemote) }
var fileDescriptorRemote = []byte{
// 284 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x91, 0xcf, 0x4a, 0xc3, 0x40,
0x10, 0xc6, 0x59, 0x8b, 0xad, 0x4c, 0x8a, 0xd4, 0x45, 0x34, 0x78, 0x08, 0x92, 0x53, 0x40, 0x09,
0xf8, 0x07, 0x0f, 0xde, 0x14, 0xf4, 0x64, 0x0f, 0xae, 0x05, 0xc1, 0x4b, 0x49, 0xcc, 0x40, 0x03,
0xdd, 0x64, 0xbb, 0x33, 0x39, 0xf4, 0x41, 0x7c, 0x27, 0x8f, 0x3e, 0x82, 0xe4, 0x49, 0xa4, 0x1b,
0xa2, 0x2b, 0xde, 0x3c, 0xff, 0x7e, 0xdf, 0xc7, 0xc7, 0x0c, 0x8c, 0x2d, 0xea, 0x9a, 0x31, 0x35,
0xb6, 0xe6, 0x5a, 0x82, 0xb1, 0xb5, 0x46, 0x5e, 0x60, 0x43, 0x47, 0x01, 0xaf, 0x0d, 0x52, 0x07,
0xe2, 0x7b, 0x18, 0x3f, 0xdb, 0x92, 0x51, 0xe1, 0xaa, 0x41, 0x62, 0x79, 0x05, 0xc0, 0xa5, 0x46,
0x42, 0x5b, 0x22, 0x85, 0xe2, 0x78, 0x90, 0x04, 0xe7, 0x07, 0xe9, 0x4f, 0x3a, 0x9d, 0x95, 0x1a,
0x9f, 0x1c, 0x55, 0x9e, 0x19, 0x5f, 0x43, 0xa0, 0x30, 0x2b, 0xfa, 0x9a, 0x13, 0x18, 0xad, 0x1a,
0xbf, 0x63, 0xcf, 0xef, 0x78, 0x6c, 0xd0, 0xae, 0x55, 0x6f, 0xc4, 0x37, 0x30, 0xee, 0xb2, 0x64,
0xea, 0x8a, 0x50, 0x9e, 0xc1, 0xc8, 0x22, 0x35, 0x4b, 0xee, 0xc3, 0x87, 0x7f, 0xc3, 0x8e, 0xab,
0xde, 0x8b, 0xdf, 0x04, 0x6c, 0x3b, 0x20, 0x4f, 0x41, 0x12, 0x67, 0x96, 0xe7, 0x6e, 0x1c, 0x67,
0xda, 0xcc, 0xf5, 0xa6, 0x47, 0x24, 0x03, 0x35, 0x71, 0x64, 0xd6, 0x83, 0x29, 0xc9, 0x04, 0x26,
0x58, 0x15, 0xbf, 0xdd, 0x2d, 0xe7, 0xee, 0x62, 0x55, 0xf8, 0xe6, 0x25, 0xec, 0xe8, 0x8c, 0x5f,
0x17, 0x68, 0x29, 0x1c, 0xb8, 0x55, 0xa1, 0xbf, 0xea, 0x21, 0xcb, 0x71, 0x39, 0xed, 0x04, 0xf5,
0x6d, 0xc6, 0x77, 0x10, 0x78, 0x7b, 0xff, 0x7b, 0xdd, 0xdb, 0xfd, 0xf7, 0x36, 0x12, 0x1f, 0x6d,
0x24, 0x3e, 0xdb, 0x48, 0xbc, 0x0c, 0x37, 0x01, 0x93, 0xe7, 0x43, 0xf7, 0xc2, 0x8b, 0xaf, 0x00,
0x00, 0x00, 0xff, 0xff, 0x0a, 0x31, 0xaa, 0x60, 0xeb, 0x01, 0x00, 0x00,
}

42
prompb/remote.proto Normal file
View File

@ -0,0 +1,42 @@
// Copyright 2016 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package prometheus;
option go_package = "prompb";
import "types.proto";
message WriteRequest {
repeated prometheus.TimeSeries timeseries = 1;
}
message ReadRequest {
repeated Query queries = 1;
}
message ReadResponse {
// In same order as the request's queries.
repeated QueryResult results = 1;
}
message Query {
int64 start_timestamp_ms = 1;
int64 end_timestamp_ms = 2;
repeated prometheus.LabelMatcher matchers = 3;
}
message QueryResult {
repeated prometheus.TimeSeries timeseries = 1;
}

View File

@ -27,16 +27,16 @@ func (a nopAppendable) Appender() (storage.Appender, error) {
type nopAppender struct{}
func (a nopAppender) Add(labels.Labels, int64, float64) (string, error) { return "", nil }
func (a nopAppender) AddFast(string, int64, float64) error { return nil }
func (a nopAppender) Commit() error { return nil }
func (a nopAppender) Rollback() error { return nil }
func (a nopAppender) Add(labels.Labels, int64, float64) (string, error) { return "", nil }
func (a nopAppender) AddFast(labels.Labels, string, int64, float64) error { return nil }
func (a nopAppender) Commit() error { return nil }
func (a nopAppender) Rollback() error { return nil }
type collectResultAppender struct {
result []sample
}
func (a *collectResultAppender) AddFast(ref string, t int64, v float64) error {
func (a *collectResultAppender) AddFast(m labels.Labels, ref string, t int64, v float64) error {
// Not implemented.
return storage.ErrNotFound
}

View File

@ -773,7 +773,8 @@ loop:
ref, ok := sl.cache.getRef(yoloString(met))
if ok {
switch err = app.AddFast(ref, t, v); err {
lset := sl.cache.lsets[ref].lset
switch err = app.AddFast(lset, ref, t, v); err {
case nil:
if tp == nil {
e := sl.cache.lsets[ref]
@ -975,7 +976,8 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
ref, ok := sl.cache.getRef(s2)
if ok {
err := app.AddFast(ref, t, v)
lset := sl.cache.lsets[ref].lset
err := app.AddFast(lset, ref, t, v)
switch err {
case nil:
return nil
@ -989,13 +991,13 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
return err
}
}
met := labels.Labels{
lset := labels.Labels{
labels.Label{Name: labels.MetricName, Value: s},
}
ref, err := app.Add(met, t, v)
ref, err := app.Add(lset, t, v)
switch err {
case nil:
sl.cache.addRef(s2, ref, met, met.Hash())
sl.cache.addRef(s2, ref, lset, lset.Hash())
return nil
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
return nil

View File

@ -885,8 +885,8 @@ func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (string, e
}
}
func (app *errorAppender) AddFast(ref string, t int64, v float64) error {
return app.collectResultAppender.AddFast(ref, t, v)
func (app *errorAppender) AddFast(lset labels.Labels, ref string, t int64, v float64) error {
return app.collectResultAppender.AddFast(lset, ref, t, v)
}
func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) {

View File

@ -212,14 +212,14 @@ func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (string, e
return ref, nil
}
func (app *limitAppender) AddFast(ref string, t int64, v float64) error {
func (app *limitAppender) AddFast(lset labels.Labels, ref string, t int64, v float64) error {
if !value.IsStaleNaN(v) {
app.i++
if app.i > app.limit {
return errSampleLimit
}
}
if err := app.Appender.AddFast(ref, t, v); err != nil {
if err := app.Appender.AddFast(lset, ref, t, v); err != nil {
return err
}
return nil
@ -243,11 +243,11 @@ func (app *timeLimitAppender) Add(lset labels.Labels, t int64, v float64) (strin
return ref, nil
}
func (app *timeLimitAppender) AddFast(ref string, t int64, v float64) error {
func (app *timeLimitAppender) AddFast(lset labels.Labels, ref string, t int64, v float64) error {
if t > app.maxTime {
return storage.ErrOutOfBounds
}
if err := app.Appender.AddFast(ref, t, v); err != nil {
if err := app.Appender.AddFast(lset, ref, t, v); err != nil {
return err
}
return nil

433
storage/fanout.go Normal file
View File

@ -0,0 +1,433 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.package remote
package storage
import (
"container/heap"
"strings"
"github.com/prometheus/common/log"
"github.com/prometheus/prometheus/pkg/labels"
)
type fanout struct {
primary Storage
secondaries []Storage
}
// NewFanout returns a new fan-out Storage, which proxies reads and writes
// through to multiple underlying storages.
func NewFanout(primary Storage, secondaries ...Storage) Storage {
return &fanout{
primary: primary,
secondaries: secondaries,
}
}
func (f *fanout) Querier(mint, maxt int64) (Querier, error) {
queriers := mergeQuerier{
queriers: make([]Querier, 0, 1+len(f.secondaries)),
}
// Add primary querier
querier, err := f.primary.Querier(mint, maxt)
if err != nil {
return nil, err
}
queriers.queriers = append(queriers.queriers, querier)
// Add secondary queriers
for _, storage := range f.secondaries {
querier, err := storage.Querier(mint, maxt)
if err != nil {
queriers.Close()
return nil, err
}
queriers.queriers = append(queriers.queriers, querier)
}
return &queriers, nil
}
func (f *fanout) Appender() (Appender, error) {
primary, err := f.primary.Appender()
if err != nil {
return nil, err
}
secondaries := make([]Appender, 0, len(f.secondaries))
for _, storage := range f.secondaries {
appender, err := storage.Appender()
if err != nil {
return nil, err
}
secondaries = append(secondaries, appender)
}
return &fanoutAppender{
primary: primary,
secondaries: secondaries,
}, nil
}
// Close closes the storage and all its underlying resources.
func (f *fanout) Close() error {
if err := f.primary.Close(); err != nil {
return err
}
// TODO return multiple errors?
var lastErr error
for _, storage := range f.secondaries {
if err := storage.Close(); err != nil {
lastErr = err
}
}
return lastErr
}
// fanoutAppender implements Appender.
type fanoutAppender struct {
primary Appender
secondaries []Appender
}
func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (string, error) {
ref, err := f.primary.Add(l, t, v)
if err != nil {
return ref, err
}
for _, appender := range f.secondaries {
if _, err := appender.Add(l, t, v); err != nil {
return "", err
}
}
return ref, nil
}
func (f *fanoutAppender) AddFast(l labels.Labels, ref string, t int64, v float64) error {
if err := f.primary.AddFast(l, ref, t, v); err != nil {
return err
}
for _, appender := range f.secondaries {
if _, err := appender.Add(l, t, v); err != nil {
return err
}
}
return nil
}
func (f *fanoutAppender) Commit() (err error) {
err = f.primary.Commit()
for _, appender := range f.secondaries {
if err == nil {
err = appender.Commit()
} else {
if rollbackErr := appender.Rollback(); rollbackErr != nil {
log.Errorf("Squashed rollback error on commit: %v", rollbackErr)
}
}
}
return
}
func (f *fanoutAppender) Rollback() (err error) {
err = f.primary.Rollback()
for _, appender := range f.secondaries {
rollbackErr := appender.Rollback()
if err == nil {
err = rollbackErr
} else if rollbackErr != nil {
log.Errorf("Squashed rollback error on rollback: %v", rollbackErr)
}
}
return nil
}
// mergeQuerier implements Querier.
type mergeQuerier struct {
queriers []Querier
}
// NewMergeQuerier returns a new Querier that merges results of input queriers.
func NewMergeQuerier(queriers []Querier) Querier {
return &mergeQuerier{
queriers: queriers,
}
}
// Select returns a set of series that matches the given label matchers.
func (q *mergeQuerier) Select(matchers ...*labels.Matcher) SeriesSet {
seriesSets := make([]SeriesSet, 0, len(q.queriers))
for _, querier := range q.queriers {
seriesSets = append(seriesSets, querier.Select(matchers...))
}
return newMergeSeriesSet(seriesSets)
}
// LabelValues returns all potential values for a label name.
func (q *mergeQuerier) LabelValues(name string) ([]string, error) {
var results [][]string
for _, querier := range q.queriers {
values, err := querier.LabelValues(name)
if err != nil {
return nil, err
}
results = append(results, values)
}
return mergeStringSlices(results), nil
}
func mergeStringSlices(ss [][]string) []string {
switch len(ss) {
case 0:
return nil
case 1:
return ss[0]
case 2:
return mergeTwoStringSlices(ss[0], ss[1])
default:
halfway := len(ss) / 2
return mergeTwoStringSlices(
mergeStringSlices(ss[:halfway]),
mergeStringSlices(ss[halfway:]),
)
}
}
func mergeTwoStringSlices(a, b []string) []string {
i, j := 0, 0
result := make([]string, 0, len(a)+len(b))
for i < len(a) && j < len(b) {
switch strings.Compare(a[i], b[j]) {
case 0:
result = append(result, a[i])
i++
j++
case -1:
result = append(result, a[i])
i++
case 1:
result = append(result, b[j])
j++
}
}
result = append(result, a[i:]...)
result = append(result, b[j:]...)
return result
}
// Close releases the resources of the Querier.
func (q *mergeQuerier) Close() error {
// TODO return multiple errors?
var lastErr error
for _, querier := range q.queriers {
if err := querier.Close(); err != nil {
lastErr = err
}
}
return lastErr
}
// mergeSeriesSet implements SeriesSet
type mergeSeriesSet struct {
currentLabels labels.Labels
currentSets []SeriesSet
heap seriesSetHeap
sets []SeriesSet
}
func newMergeSeriesSet(sets []SeriesSet) SeriesSet {
// Sets need to be pre-advanced, so we can introspect the label of the
// series under the cursor.
var h seriesSetHeap
for _, set := range sets {
if set.Next() {
heap.Push(&h, set)
}
}
return &mergeSeriesSet{
heap: h,
sets: sets,
}
}
func (c *mergeSeriesSet) Next() bool {
// Firstly advance all the current series sets. If any of them have run out
// we can drop them, otherwise they should be inserted back into the heap.
for _, set := range c.currentSets {
if set.Next() {
heap.Push(&c.heap, set)
}
}
if len(c.heap) == 0 {
return false
}
// Now, pop items of the heap that have equal label sets.
c.currentSets = nil
c.currentLabels = c.heap[0].At().Labels()
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
set := heap.Pop(&c.heap).(SeriesSet)
c.currentSets = append(c.currentSets, set)
}
return true
}
func (c *mergeSeriesSet) At() Series {
series := []Series{}
for _, seriesSet := range c.currentSets {
series = append(series, seriesSet.At())
}
return &mergeSeries{
labels: c.currentLabels,
series: series,
}
}
func (c *mergeSeriesSet) Err() error {
for _, set := range c.sets {
if err := set.Err(); err != nil {
return err
}
}
return nil
}
type seriesSetHeap []SeriesSet
func (h seriesSetHeap) Len() int { return len(h) }
func (h seriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h seriesSetHeap) Less(i, j int) bool {
a, b := h[i].At().Labels(), h[j].At().Labels()
return labels.Compare(a, b) < 0
}
func (h *seriesSetHeap) Push(x interface{}) {
*h = append(*h, x.(SeriesSet))
}
func (h *seriesSetHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
type mergeSeries struct {
labels labels.Labels
series []Series
}
func (m *mergeSeries) Labels() labels.Labels {
return m.labels
}
func (m *mergeSeries) Iterator() SeriesIterator {
iterators := make([]SeriesIterator, 0, len(m.series))
for _, s := range m.series {
iterators = append(iterators, s.Iterator())
}
return newMergeIterator(iterators)
}
type mergeIterator struct {
iterators []SeriesIterator
h seriesIteratorHeap
}
func newMergeIterator(iterators []SeriesIterator) SeriesIterator {
return &mergeIterator{
iterators: iterators,
h: nil,
}
}
func (c *mergeIterator) Seek(t int64) bool {
c.h = seriesIteratorHeap{}
for _, iter := range c.iterators {
if iter.Seek(t) {
heap.Push(&c.h, iter)
}
}
return len(c.h) > 0
}
func (c *mergeIterator) At() (t int64, v float64) {
if len(c.h) == 0 {
log.Error("mergeIterator.At() called after .Next() returned false.")
return 0, 0
}
// TODO do I need to dedupe or just merge?
return c.h[0].At()
}
func (c *mergeIterator) Next() bool {
if c.h == nil {
for _, iter := range c.iterators {
if iter.Next() {
heap.Push(&c.h, iter)
}
}
return len(c.h) > 0
}
if len(c.h) == 0 {
return false
}
iter := heap.Pop(&c.h).(SeriesIterator)
if iter.Next() {
heap.Push(&c.h, iter)
}
return len(c.h) > 0
}
func (c *mergeIterator) Err() error {
for _, iter := range c.iterators {
if err := iter.Err(); err != nil {
return err
}
}
return nil
}
type seriesIteratorHeap []SeriesIterator
func (h seriesIteratorHeap) Len() int { return len(h) }
func (h seriesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h seriesIteratorHeap) Less(i, j int) bool {
at, _ := h[i].At()
bt, _ := h[j].At()
return at < bt
}
func (h *seriesIteratorHeap) Push(x interface{}) {
*h = append(*h, x.(SeriesIterator))
}
func (h *seriesIteratorHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

230
storage/fanout_test.go Normal file
View File

@ -0,0 +1,230 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.package remote
package storage
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/pkg/labels"
)
func TestMergeStringSlices(t *testing.T) {
for _, tc := range []struct {
input [][]string
expected []string
}{
{},
{[][]string{{"foo"}}, []string{"foo"}},
{[][]string{{"foo"}, {"bar"}}, []string{"bar", "foo"}},
{[][]string{{"foo"}, {"bar"}, {"baz"}}, []string{"bar", "baz", "foo"}},
} {
require.Equal(t, tc.expected, mergeStringSlices(tc.input))
}
}
func TestMergeTwoStringSlices(t *testing.T) {
for _, tc := range []struct {
a, b, expected []string
}{
{[]string{}, []string{}, []string{}},
{[]string{"foo"}, nil, []string{"foo"}},
{nil, []string{"bar"}, []string{"bar"}},
{[]string{"foo"}, []string{"bar"}, []string{"bar", "foo"}},
{[]string{"foo"}, []string{"bar", "baz"}, []string{"bar", "baz", "foo"}},
{[]string{"foo"}, []string{"foo"}, []string{"foo"}},
} {
require.Equal(t, tc.expected, mergeTwoStringSlices(tc.a, tc.b))
}
}
func TestMergeSeriesSet(t *testing.T) {
for _, tc := range []struct {
input []SeriesSet
expected SeriesSet
}{
{
input: []SeriesSet{newMockSeriesSet()},
expected: newMockSeriesSet(),
},
{
input: []SeriesSet{newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}),
)},
expected: newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}),
),
},
{
input: []SeriesSet{newMockSeriesSet(
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}),
), newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}),
)},
expected: newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}),
),
},
{
input: []SeriesSet{newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}),
), newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{3, 3}, {4, 4}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{2, 2}, {3, 3}}),
)},
expected: newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}, {3, 3}, {4, 4}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}}),
),
},
} {
merged := newMergeSeriesSet(tc.input)
for merged.Next() {
require.True(t, tc.expected.Next())
actualSeries := merged.At()
expectedSeries := tc.expected.At()
require.Equal(t, expectedSeries.Labels(), actualSeries.Labels())
require.Equal(t, drainSamples(expectedSeries.Iterator()), drainSamples(actualSeries.Iterator()))
}
require.False(t, tc.expected.Next())
}
}
func TestMergeIterator(t *testing.T) {
for _, tc := range []struct {
input []SeriesIterator
expected []sample
}{
{
input: []SeriesIterator{
newListSeriesIterator([]sample{{0, 0}, {1, 1}}),
},
expected: []sample{{0, 0}, {1, 1}},
},
{
input: []SeriesIterator{
newListSeriesIterator([]sample{{0, 0}, {1, 1}}),
newListSeriesIterator([]sample{{2, 2}, {3, 3}}),
},
expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}},
},
{
input: []SeriesIterator{
newListSeriesIterator([]sample{{0, 0}, {3, 3}}),
newListSeriesIterator([]sample{{1, 1}, {4, 4}}),
newListSeriesIterator([]sample{{2, 2}, {5, 5}}),
},
expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}},
},
} {
merged := newMergeIterator(tc.input)
actual := drainSamples(merged)
require.Equal(t, tc.expected, actual)
}
}
func TestMergeIteratorSeek(t *testing.T) {
for _, tc := range []struct {
input []SeriesIterator
seek int64
expected []sample
}{
{
input: []SeriesIterator{
newListSeriesIterator([]sample{{0, 0}, {1, 1}, {2, 2}}),
},
seek: 1,
expected: []sample{{1, 1}, {2, 2}},
},
{
input: []SeriesIterator{
newListSeriesIterator([]sample{{0, 0}, {1, 1}}),
newListSeriesIterator([]sample{{2, 2}, {3, 3}}),
},
seek: 2,
expected: []sample{{2, 2}, {3, 3}},
},
{
input: []SeriesIterator{
newListSeriesIterator([]sample{{0, 0}, {3, 3}}),
newListSeriesIterator([]sample{{1, 1}, {4, 4}}),
newListSeriesIterator([]sample{{2, 2}, {5, 5}}),
},
seek: 2,
expected: []sample{{2, 2}, {3, 3}, {4, 4}, {5, 5}},
},
} {
merged := newMergeIterator(tc.input)
actual := []sample{}
if merged.Seek(tc.seek) {
t, v := merged.At()
actual = append(actual, sample{t, v})
}
actual = append(actual, drainSamples(merged)...)
require.Equal(t, tc.expected, actual)
}
}
func drainSamples(iter SeriesIterator) []sample {
result := []sample{}
for iter.Next() {
t, v := iter.At()
result = append(result, sample{t, v})
}
return result
}
type mockSeriesSet struct {
idx int
series []Series
}
func newMockSeriesSet(series ...Series) SeriesSet {
return &mockSeriesSet{
idx: -1,
series: series,
}
}
func (m *mockSeriesSet) Next() bool {
m.idx++
return m.idx < len(m.series)
}
func (m *mockSeriesSet) At() Series {
return m.series[m.idx]
}
func (m *mockSeriesSet) Err() error {
return nil
}
func newMockSeries(lset labels.Labels, samples []sample) Series {
return &mockSeries{
labels: func() labels.Labels {
return lset
},
iterator: func() SeriesIterator {
return newListSeriesIterator(samples)
},
}
}

View File

@ -56,7 +56,7 @@ type Querier interface {
type Appender interface {
Add(l labels.Labels, t int64, v float64) (string, error)
AddFast(ref string, t int64, v float64) error
AddFast(l labels.Labels, ref string, t int64, v float64) error
// Commit submits the collected samples and purges the batch.
Commit() error

201
storage/remote/client.go Normal file
View File

@ -0,0 +1,201 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/util/httputil"
)
const maxErrMsgLen = 256
// Client allows reading and writing from/to a remote HTTP endpoint.
type Client struct {
index int // Used to differentiate metrics.
url *config.URL
client *http.Client
timeout time.Duration
}
type clientConfig struct {
url *config.URL
timeout model.Duration
httpClientConfig config.HTTPClientConfig
}
// NewClient creates a new Client.
func NewClient(index int, conf *clientConfig) (*Client, error) {
httpClient, err := httputil.NewClientFromConfig(conf.httpClientConfig)
if err != nil {
return nil, err
}
return &Client{
index: index,
url: conf.url,
client: httpClient,
timeout: time.Duration(conf.timeout),
}, nil
}
type recoverableError struct {
error
}
// Store sends a batch of samples to the HTTP endpoint.
func (c *Client) Store(samples model.Samples) error {
req := &prompb.WriteRequest{
Timeseries: make([]*prompb.TimeSeries, 0, len(samples)),
}
for _, s := range samples {
ts := &prompb.TimeSeries{
Labels: make([]*prompb.Label, 0, len(s.Metric)),
}
for k, v := range s.Metric {
ts.Labels = append(ts.Labels,
&prompb.Label{
Name: string(k),
Value: string(v),
})
}
ts.Samples = []*prompb.Sample{
{
Value: float64(s.Value),
Timestamp: int64(s.Timestamp),
},
}
req.Timeseries = append(req.Timeseries, ts)
}
data, err := proto.Marshal(req)
if err != nil {
return err
}
compressed := snappy.Encode(nil, data)
httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed))
if err != nil {
// Errors from NewRequest are from unparseable URLs, so are not
// recoverable.
return err
}
httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
httpResp, err := ctxhttp.Do(ctx, c.client, httpReq)
if err != nil {
// Errors from client.Do are from (for example) network errors, so are
// recoverable.
return recoverableError{err}
}
defer httpResp.Body.Close()
if httpResp.StatusCode/100 != 2 {
scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
}
if httpResp.StatusCode/100 == 5 {
return recoverableError{err}
}
return err
}
// Name identifies the client.
func (c Client) Name() string {
return fmt.Sprintf("%d:%s", c.index, c.url)
}
// Read reads from a remote endpoint.
func (c *Client) Read(ctx context.Context, from, through int64, matchers []*prompb.LabelMatcher) ([]*prompb.TimeSeries, error) {
req := &prompb.ReadRequest{
// TODO: Support batching multiple queries into one read request,
// as the protobuf interface allows for it.
Queries: []*prompb.Query{{
StartTimestampMs: from,
EndTimestampMs: through,
Matchers: matchers,
}},
}
data, err := proto.Marshal(req)
if err != nil {
return nil, fmt.Errorf("unable to marshal read request: %v", err)
}
compressed := snappy.Encode(nil, data)
httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed))
if err != nil {
return nil, fmt.Errorf("unable to create request: %v", err)
}
httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
httpResp, err := ctxhttp.Do(ctx, c.client, httpReq)
if err != nil {
return nil, fmt.Errorf("error sending request: %v", err)
}
defer httpResp.Body.Close()
if httpResp.StatusCode/100 != 2 {
return nil, fmt.Errorf("server returned HTTP status %s", httpResp.Status)
}
compressed, err = ioutil.ReadAll(httpResp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response: %v", err)
}
uncompressed, err := snappy.Decode(nil, compressed)
if err != nil {
return nil, fmt.Errorf("error reading response: %v", err)
}
var resp prompb.ReadResponse
err = proto.Unmarshal(uncompressed, &resp)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal response body: %v", err)
}
if len(resp.Results) != len(req.Queries) {
return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results))
}
return resp.Results[0].Timeseries, nil
}

View File

@ -0,0 +1,79 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.package remote
package remote
import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"strings"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
)
var longErrMessage = strings.Repeat("error message", maxErrMsgLen)
func TestStoreHTTPErrorHandling(t *testing.T) {
tests := []struct {
code int
err error
}{
{
code: 200,
err: nil,
},
{
code: 300,
err: fmt.Errorf("server returned HTTP status 300 Multiple Choices: " + longErrMessage[:maxErrMsgLen]),
},
{
code: 404,
err: fmt.Errorf("server returned HTTP status 404 Not Found: " + longErrMessage[:maxErrMsgLen]),
},
{
code: 500,
err: recoverableError{fmt.Errorf("server returned HTTP status 500 Internal Server Error: " + longErrMessage[:maxErrMsgLen])},
},
}
for i, test := range tests {
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, longErrMessage, test.code)
}),
)
serverURL, err := url.Parse(server.URL)
if err != nil {
panic(err)
}
c, err := NewClient(0, &clientConfig{
url: &config.URL{serverURL},
timeout: model.Duration(time.Second),
})
err = c.Store(nil)
if !reflect.DeepEqual(err, test.err) {
t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err)
}
server.Close()
}
}

68
storage/remote/ewma.go Normal file
View File

@ -0,0 +1,68 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"sync"
"sync/atomic"
"time"
)
// ewmaRate tracks an exponentially weighted moving average of a per-second rate.
type ewmaRate struct {
newEvents int64
alpha float64
interval time.Duration
lastRate float64
init bool
mutex sync.Mutex
}
// newEWMARate always allocates a new ewmaRate, as this guarantees the atomically
// accessed int64 will be aligned on ARM. See prometheus#2666.
func newEWMARate(alpha float64, interval time.Duration) *ewmaRate {
return &ewmaRate{
alpha: alpha,
interval: interval,
}
}
// rate returns the per-second rate.
func (r *ewmaRate) rate() float64 {
r.mutex.Lock()
defer r.mutex.Unlock()
return r.lastRate
}
// tick assumes to be called every r.interval.
func (r *ewmaRate) tick() {
newEvents := atomic.LoadInt64(&r.newEvents)
atomic.AddInt64(&r.newEvents, -newEvents)
instantRate := float64(newEvents) / r.interval.Seconds()
r.mutex.Lock()
defer r.mutex.Unlock()
if r.init {
r.lastRate += r.alpha * (instantRate - r.lastRate)
} else {
r.init = true
r.lastRate = instantRate
}
}
// inc counts one event.
func (r *ewmaRate) incr(incr int64) {
atomic.AddInt64(&r.newEvents, incr)
}

View File

@ -0,0 +1,514 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"math"
"sync"
"time"
"golang.org/x/time/rate"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/relabel"
)
// String constants for instrumentation.
const (
namespace = "prometheus"
subsystem = "remote_storage"
queue = "queue"
// We track samples in/out and how long pushes take using an Exponentially
// Weighted Moving Average.
ewmaWeight = 0.2
shardUpdateDuration = 10 * time.Second
// Allow 30% too many shards before scaling down.
shardToleranceFraction = 0.3
// Limit to 1 log event every 10s
logRateLimit = 0.1
logBurst = 10
)
var (
succeededSamplesTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "succeeded_samples_total",
Help: "Total number of samples successfully sent to remote storage.",
},
[]string{queue},
)
failedSamplesTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "failed_samples_total",
Help: "Total number of samples which failed on send to remote storage.",
},
[]string{queue},
)
droppedSamplesTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "dropped_samples_total",
Help: "Total number of samples which were dropped due to the queue being full.",
},
[]string{queue},
)
sentBatchDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sent_batch_duration_seconds",
Help: "Duration of sample batch send calls to the remote storage.",
Buckets: prometheus.DefBuckets,
},
[]string{queue},
)
queueLength = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_length",
Help: "The number of processed samples queued to be sent to the remote storage.",
},
[]string{queue},
)
queueCapacity = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_capacity",
Help: "The capacity of the queue of samples to be sent to the remote storage.",
},
[]string{queue},
)
numShards = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "shards",
Help: "The number of shards used for parallel sending to the remote storage.",
},
[]string{queue},
)
)
func init() {
prometheus.MustRegister(succeededSamplesTotal)
prometheus.MustRegister(failedSamplesTotal)
prometheus.MustRegister(droppedSamplesTotal)
prometheus.MustRegister(sentBatchDuration)
prometheus.MustRegister(queueLength)
prometheus.MustRegister(queueCapacity)
prometheus.MustRegister(numShards)
}
// QueueManagerConfig is the configuration for the queue used to write to remote
// storage.
type QueueManagerConfig struct {
// Number of samples to buffer per shard before we start dropping them.
QueueCapacity int
// Max number of shards, i.e. amount of concurrency.
MaxShards int
// Maximum number of samples per send.
MaxSamplesPerSend int
// Maximum time sample will wait in buffer.
BatchSendDeadline time.Duration
// Max number of times to retry a batch on recoverable errors.
MaxRetries int
// On recoverable errors, backoff exponentially.
MinBackoff time.Duration
MaxBackoff time.Duration
}
// defaultQueueManagerConfig is the default remote queue configuration.
var defaultQueueManagerConfig = QueueManagerConfig{
// With a maximum of 1000 shards, assuming an average of 100ms remote write
// time and 100 samples per batch, we will be able to push 1M samples/s.
MaxShards: 1000,
MaxSamplesPerSend: 100,
// By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At
// 1000 shards, this will buffer 100M samples total.
QueueCapacity: 100 * 1000,
BatchSendDeadline: 5 * time.Second,
// Max number of times to retry a batch on recoverable errors.
MaxRetries: 10,
MinBackoff: 30 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond,
}
// StorageClient defines an interface for sending a batch of samples to an
// external timeseries database.
type StorageClient interface {
// Store stores the given samples in the remote storage.
Store(model.Samples) error
// Name identifies the remote storage implementation.
Name() string
}
// QueueManager manages a queue of samples to be sent to the Storage
// indicated by the provided StorageClient.
type QueueManager struct {
cfg QueueManagerConfig
externalLabels model.LabelSet
relabelConfigs []*config.RelabelConfig
client StorageClient
queueName string
logLimiter *rate.Limiter
shardsMtx sync.Mutex
shards *shards
numShards int
reshardChan chan int
quit chan struct{}
wg sync.WaitGroup
samplesIn, samplesOut, samplesOutDuration *ewmaRate
integralAccumulator float64
}
// NewQueueManager builds a new QueueManager.
func NewQueueManager(cfg QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager {
t := &QueueManager{
cfg: cfg,
externalLabels: externalLabels,
relabelConfigs: relabelConfigs,
client: client,
queueName: client.Name(),
logLimiter: rate.NewLimiter(logRateLimit, logBurst),
numShards: 1,
reshardChan: make(chan int),
quit: make(chan struct{}),
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration),
samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),
}
t.shards = t.newShards(t.numShards)
numShards.WithLabelValues(t.queueName).Set(float64(t.numShards))
queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity))
return t
}
// Append queues a sample to be sent to the remote storage. It drops the
// sample on the floor if the queue is full.
// Always returns nil.
func (t *QueueManager) Append(s *model.Sample) error {
var snew model.Sample
snew = *s
snew.Metric = s.Metric.Clone()
for ln, lv := range t.externalLabels {
if _, ok := s.Metric[ln]; !ok {
snew.Metric[ln] = lv
}
}
snew.Metric = model.Metric(
relabel.Process(model.LabelSet(snew.Metric), t.relabelConfigs...))
if snew.Metric == nil {
return nil
}
t.shardsMtx.Lock()
enqueued := t.shards.enqueue(&snew)
t.shardsMtx.Unlock()
if enqueued {
queueLength.WithLabelValues(t.queueName).Inc()
} else {
droppedSamplesTotal.WithLabelValues(t.queueName).Inc()
if t.logLimiter.Allow() {
log.Warn("Remote storage queue full, discarding sample. Multiple subsequent messages of this kind may be suppressed.")
}
}
return nil
}
// NeedsThrottling implements storage.SampleAppender. It will always return
// false as a remote storage drops samples on the floor if backlogging instead
// of asking for throttling.
func (*QueueManager) NeedsThrottling() bool {
return false
}
// Start the queue manager sending samples to the remote storage.
// Does not block.
func (t *QueueManager) Start() {
t.wg.Add(2)
go t.updateShardsLoop()
go t.reshardLoop()
t.shardsMtx.Lock()
defer t.shardsMtx.Unlock()
t.shards.start()
}
// Stop stops sending samples to the remote storage and waits for pending
// sends to complete.
func (t *QueueManager) Stop() {
log.Infof("Stopping remote storage...")
close(t.quit)
t.wg.Wait()
t.shardsMtx.Lock()
defer t.shardsMtx.Unlock()
t.shards.stop()
log.Info("Remote storage stopped.")
}
func (t *QueueManager) updateShardsLoop() {
defer t.wg.Done()
ticker := time.Tick(shardUpdateDuration)
for {
select {
case <-ticker:
t.calculateDesiredShards()
case <-t.quit:
return
}
}
}
func (t *QueueManager) calculateDesiredShards() {
t.samplesIn.tick()
t.samplesOut.tick()
t.samplesOutDuration.tick()
// We use the number of incoming samples as a prediction of how much work we
// will need to do next iteration. We add to this any pending samples
// (received - send) so we can catch up with any backlog. We use the average
// outgoing batch latency to work out how many shards we need.
var (
samplesIn = t.samplesIn.rate()
samplesOut = t.samplesOut.rate()
samplesPending = samplesIn - samplesOut
samplesOutDuration = t.samplesOutDuration.rate()
)
// We use an integral accumulator, like in a PID, to help dampen oscillation.
t.integralAccumulator = t.integralAccumulator + (samplesPending * 0.1)
if samplesOut <= 0 {
return
}
var (
timePerSample = samplesOutDuration / samplesOut
desiredShards = (timePerSample * (samplesIn + samplesPending + t.integralAccumulator)) / float64(time.Second)
)
log.Debugf("QueueManager.caclulateDesiredShards samplesIn=%f, samplesOut=%f, samplesPending=%f, desiredShards=%f",
samplesIn, samplesOut, samplesPending, desiredShards)
// Changes in the number of shards must be greater than shardToleranceFraction.
var (
lowerBound = float64(t.numShards) * (1. - shardToleranceFraction)
upperBound = float64(t.numShards) * (1. + shardToleranceFraction)
)
log.Debugf("QueueManager.updateShardsLoop %f <= %f <= %f", lowerBound, desiredShards, upperBound)
if lowerBound <= desiredShards && desiredShards <= upperBound {
return
}
numShards := int(math.Ceil(desiredShards))
if numShards > t.cfg.MaxShards {
numShards = t.cfg.MaxShards
} else if numShards < 1 {
numShards = 1
}
if numShards == t.numShards {
return
}
// Resharding can take some time, and we want this loop
// to stay close to shardUpdateDuration.
select {
case t.reshardChan <- numShards:
log.Infof("Remote storage resharding from %d to %d shards.", t.numShards, numShards)
t.numShards = numShards
default:
log.Infof("Currently resharding, skipping.")
}
}
func (t *QueueManager) reshardLoop() {
defer t.wg.Done()
for {
select {
case numShards := <-t.reshardChan:
t.reshard(numShards)
case <-t.quit:
return
}
}
}
func (t *QueueManager) reshard(n int) {
numShards.WithLabelValues(t.queueName).Set(float64(n))
t.shardsMtx.Lock()
newShards := t.newShards(n)
oldShards := t.shards
t.shards = newShards
t.shardsMtx.Unlock()
oldShards.stop()
// We start the newShards after we have stopped (the therefore completely
// flushed) the oldShards, to guarantee we only every deliver samples in
// order.
newShards.start()
}
type shards struct {
qm *QueueManager
queues []chan *model.Sample
done chan struct{}
wg sync.WaitGroup
}
func (t *QueueManager) newShards(numShards int) *shards {
queues := make([]chan *model.Sample, numShards)
for i := 0; i < numShards; i++ {
queues[i] = make(chan *model.Sample, t.cfg.QueueCapacity)
}
s := &shards{
qm: t,
queues: queues,
done: make(chan struct{}),
}
s.wg.Add(numShards)
return s
}
func (s *shards) len() int {
return len(s.queues)
}
func (s *shards) start() {
for i := 0; i < len(s.queues); i++ {
go s.runShard(i)
}
}
func (s *shards) stop() {
for _, shard := range s.queues {
close(shard)
}
s.wg.Wait()
}
func (s *shards) enqueue(sample *model.Sample) bool {
s.qm.samplesIn.incr(1)
fp := sample.Metric.FastFingerprint()
shard := uint64(fp) % uint64(len(s.queues))
select {
case s.queues[shard] <- sample:
return true
default:
return false
}
}
func (s *shards) runShard(i int) {
defer s.wg.Done()
queue := s.queues[i]
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
// If we have fewer samples than that, flush them out after a deadline
// anyways.
pendingSamples := model.Samples{}
for {
select {
case sample, ok := <-queue:
if !ok {
if len(pendingSamples) > 0 {
log.Debugf("Flushing %d samples to remote storage...", len(pendingSamples))
s.sendSamples(pendingSamples)
log.Debugf("Done flushing.")
}
return
}
queueLength.WithLabelValues(s.qm.queueName).Dec()
pendingSamples = append(pendingSamples, sample)
for len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend {
s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend])
pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:]
}
case <-time.After(s.qm.cfg.BatchSendDeadline):
if len(pendingSamples) > 0 {
s.sendSamples(pendingSamples)
pendingSamples = pendingSamples[:0]
}
}
}
}
func (s *shards) sendSamples(samples model.Samples) {
begin := time.Now()
s.sendSamplesWithBackoff(samples)
// These counters are used to caclulate the dynamic sharding, and as such
// should be maintained irrespective of success or failure.
s.qm.samplesOut.incr(int64(len(samples)))
s.qm.samplesOutDuration.incr(int64(time.Since(begin)))
}
// sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(samples model.Samples) {
backoff := s.qm.cfg.MinBackoff
for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- {
begin := time.Now()
err := s.qm.client.Store(samples)
sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds())
if err == nil {
succeededSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples)))
return
}
log.Warnf("Error sending %d samples to remote storage: %s", len(samples), err)
if _, ok := err.(recoverableError); !ok {
break
}
time.Sleep(backoff)
backoff = backoff * 2
if backoff > s.qm.cfg.MaxBackoff {
backoff = s.qm.cfg.MaxBackoff
}
}
failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples)))
}

View File

@ -0,0 +1,253 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/prometheus/common/model"
)
type TestStorageClient struct {
receivedSamples map[string]model.Samples
expectedSamples map[string]model.Samples
wg sync.WaitGroup
mtx sync.Mutex
}
func NewTestStorageClient() *TestStorageClient {
return &TestStorageClient{
receivedSamples: map[string]model.Samples{},
expectedSamples: map[string]model.Samples{},
}
}
func (c *TestStorageClient) expectSamples(ss model.Samples) {
c.mtx.Lock()
defer c.mtx.Unlock()
for _, s := range ss {
ts := s.Metric.String()
c.expectedSamples[ts] = append(c.expectedSamples[ts], s)
}
c.wg.Add(len(ss))
}
func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) {
c.wg.Wait()
c.mtx.Lock()
defer c.mtx.Unlock()
for ts, expectedSamples := range c.expectedSamples {
for i, expected := range expectedSamples {
if !expected.Equal(c.receivedSamples[ts][i]) {
t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[ts][i])
}
}
}
}
func (c *TestStorageClient) Store(ss model.Samples) error {
c.mtx.Lock()
defer c.mtx.Unlock()
for _, s := range ss {
ts := s.Metric.String()
c.receivedSamples[ts] = append(c.receivedSamples[ts], s)
}
c.wg.Add(-len(ss))
return nil
}
func (c *TestStorageClient) Name() string {
return "teststorageclient"
}
func TestSampleDelivery(t *testing.T) {
// Let's create an even number of send batches so we don't run into the
// batch timeout case.
n := defaultQueueManagerConfig.QueueCapacity * 2
samples := make(model.Samples, 0, n)
for i := 0; i < n; i++ {
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i))
samples = append(samples, &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: name,
},
Value: model.SampleValue(i),
})
}
c := NewTestStorageClient()
c.expectSamples(samples[:len(samples)/2])
cfg := defaultQueueManagerConfig
cfg.MaxShards = 1
m := NewQueueManager(cfg, nil, nil, c)
// These should be received by the client.
for _, s := range samples[:len(samples)/2] {
m.Append(s)
}
// These will be dropped because the queue is full.
for _, s := range samples[len(samples)/2:] {
m.Append(s)
}
m.Start()
defer m.Stop()
c.waitForExpectedSamples(t)
}
func TestSampleDeliveryOrder(t *testing.T) {
ts := 10
n := defaultQueueManagerConfig.MaxSamplesPerSend * ts
samples := make(model.Samples, 0, n)
for i := 0; i < n; i++ {
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i%ts))
samples = append(samples, &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: name,
},
Value: model.SampleValue(i),
Timestamp: model.Time(i),
})
}
c := NewTestStorageClient()
c.expectSamples(samples)
m := NewQueueManager(defaultQueueManagerConfig, nil, nil, c)
// These should be received by the client.
for _, s := range samples {
m.Append(s)
}
m.Start()
defer m.Stop()
c.waitForExpectedSamples(t)
}
// TestBlockingStorageClient is a queue_manager StorageClient which will block
// on any calls to Store(), until the `block` channel is closed, at which point
// the `numCalls` property will contain a count of how many times Store() was
// called.
type TestBlockingStorageClient struct {
numCalls uint64
block chan bool
}
func NewTestBlockedStorageClient() *TestBlockingStorageClient {
return &TestBlockingStorageClient{
block: make(chan bool),
numCalls: 0,
}
}
func (c *TestBlockingStorageClient) Store(s model.Samples) error {
atomic.AddUint64(&c.numCalls, 1)
<-c.block
return nil
}
func (c *TestBlockingStorageClient) NumCalls() uint64 {
return atomic.LoadUint64(&c.numCalls)
}
func (c *TestBlockingStorageClient) unlock() {
close(c.block)
}
func (c *TestBlockingStorageClient) Name() string {
return "testblockingstorageclient"
}
func (t *QueueManager) queueLen() int {
t.shardsMtx.Lock()
defer t.shardsMtx.Unlock()
queueLength := 0
for _, shard := range t.shards.queues {
queueLength += len(shard)
}
return queueLength
}
func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
// Our goal is to fully empty the queue:
// `MaxSamplesPerSend*Shards` samples should be consumed by the
// per-shard goroutines, and then another `MaxSamplesPerSend`
// should be left on the queue.
n := defaultQueueManagerConfig.MaxSamplesPerSend * 2
samples := make(model.Samples, 0, n)
for i := 0; i < n; i++ {
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i))
samples = append(samples, &model.Sample{
Metric: model.Metric{
model.MetricNameLabel: name,
},
Value: model.SampleValue(i),
})
}
c := NewTestBlockedStorageClient()
cfg := defaultQueueManagerConfig
cfg.MaxShards = 1
cfg.QueueCapacity = n
m := NewQueueManager(cfg, nil, nil, c)
m.Start()
defer func() {
c.unlock()
m.Stop()
}()
for _, s := range samples {
m.Append(s)
}
// Wait until the runShard() loops drain the queue. If things went right, it
// should then immediately block in sendSamples(), but, in case of error,
// it would spawn too many goroutines, and thus we'd see more calls to
// client.Store()
//
// The timed wait is maybe non-ideal, but, in order to verify that we're
// not spawning too many concurrent goroutines, we have to wait on the
// Run() loop to consume a specific number of elements from the
// queue... and it doesn't signal that in any obvious way, except by
// draining the queue. We cap the waiting at 1 second -- that should give
// plenty of time, and keeps the failure fairly quick if we're not draining
// the queue properly.
for i := 0; i < 100 && m.queueLen() > 0; i++ {
time.Sleep(10 * time.Millisecond)
}
if m.queueLen() != defaultQueueManagerConfig.MaxSamplesPerSend {
t.Fatalf("Failed to drain QueueManager queue, %d elements left",
m.queueLen(),
)
}
numCalls := c.NumCalls()
if numCalls != uint64(1) {
t.Errorf("Saw %d concurrent sends, expected 1", numCalls)
}
}

247
storage/remote/read.go Normal file
View File

@ -0,0 +1,247 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"context"
"sort"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
)
// Querier returns a new Querier on the storage.
func (r *Storage) Querier(mint, maxt int64) (storage.Querier, error) {
r.mtx.Lock()
defer r.mtx.Unlock()
queriers := make([]storage.Querier, 0, len(r.clients))
for _, c := range r.clients {
queriers = append(queriers, &querier{
mint: mint,
maxt: maxt,
client: c,
externalLabels: r.externalLabels,
})
}
return storage.NewMergeQuerier(queriers), nil
}
// Querier is an adapter to make a Client usable as a storage.Querier.
type querier struct {
mint, maxt int64
client *Client
externalLabels model.LabelSet
}
// Select returns a set of series that matches the given label matchers.
func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
m, added := q.addExternalLabels(matchers)
res, err := q.client.Read(context.TODO(), q.mint, q.maxt, labelMatchersToProto(m))
if err != nil {
return errSeriesSet{err: err}
}
series := make([]storage.Series, 0, len(res))
for _, ts := range res {
labels := labelPairsToLabels(ts.Labels)
removeLabels(labels, added)
series = append(series, &concreteSeries{
labels: labels,
samples: ts.Samples,
})
}
sort.Sort(byLabel(series))
return &concreteSeriesSet{
series: series,
}
}
func labelMatchersToProto(matchers []*labels.Matcher) []*prompb.LabelMatcher {
pbMatchers := make([]*prompb.LabelMatcher, 0, len(matchers))
for _, m := range matchers {
var mType prompb.LabelMatcher_Type
switch m.Type {
case labels.MatchEqual:
mType = prompb.LabelMatcher_EQ
case labels.MatchNotEqual:
mType = prompb.LabelMatcher_NEQ
case labels.MatchRegexp:
mType = prompb.LabelMatcher_RE
case labels.MatchNotRegexp:
mType = prompb.LabelMatcher_NRE
default:
panic("invalid matcher type")
}
pbMatchers = append(pbMatchers, &prompb.LabelMatcher{
Type: mType,
Name: string(m.Name),
Value: string(m.Value),
})
}
return pbMatchers
}
func labelPairsToLabels(labelPairs []*prompb.Label) labels.Labels {
result := make(labels.Labels, 0, len(labelPairs))
for _, l := range labelPairs {
result = append(result, labels.Label{
Name: l.Name,
Value: l.Value,
})
}
sort.Sort(result)
return result
}
type byLabel []storage.Series
func (a byLabel) Len() int { return len(a) }
func (a byLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 }
// LabelValues returns all potential values for a label name.
func (q *querier) LabelValues(name string) ([]string, error) {
// TODO implement?
return nil, nil
}
// Close releases the resources of the Querier.
func (q *querier) Close() error {
return nil
}
// errSeriesSet implements storage.SeriesSet, just returning an error.
type errSeriesSet struct {
err error
}
func (errSeriesSet) Next() bool {
return false
}
func (errSeriesSet) At() storage.Series {
return nil
}
func (e errSeriesSet) Err() error {
return e.err
}
// concreteSeriesSet implements storage.SeriesSet.
type concreteSeriesSet struct {
cur int
series []storage.Series
}
func (c *concreteSeriesSet) Next() bool {
c.cur++
return c.cur < len(c.series)
}
func (c *concreteSeriesSet) At() storage.Series {
return c.series[c.cur]
}
func (c *concreteSeriesSet) Err() error {
return nil
}
// concreteSeries implementes storage.Series.
type concreteSeries struct {
labels labels.Labels
samples []*prompb.Sample
}
func (c *concreteSeries) Labels() labels.Labels {
return c.labels
}
func (c *concreteSeries) Iterator() storage.SeriesIterator {
return newConcreteSeriersIterator(c)
}
// concreteSeriesIterator implements storage.SeriesIterator.
type concreteSeriesIterator struct {
cur int
series *concreteSeries
}
func newConcreteSeriersIterator(series *concreteSeries) storage.SeriesIterator {
return &concreteSeriesIterator{
cur: -1,
series: series,
}
}
func (c *concreteSeriesIterator) Seek(t int64) bool {
c.cur = sort.Search(len(c.series.samples), func(n int) bool {
return c.series.samples[n].Timestamp >= t
})
return c.cur < len(c.series.samples)
}
func (c *concreteSeriesIterator) At() (t int64, v float64) {
s := c.series.samples[c.cur]
return s.Timestamp, s.Value
}
func (c *concreteSeriesIterator) Next() bool {
c.cur++
return c.cur < len(c.series.samples)
}
func (c *concreteSeriesIterator) Err() error {
return nil
}
// addExternalLabels adds matchers for each external label. External labels
// that already have a corresponding user-supplied matcher are skipped, as we
// assume that the user explicitly wants to select a different value for them.
// We return the new set of matchers, along with a map of labels for which
// matchers were added, so that these can later be removed from the result
// time series again.
func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) {
el := make(model.LabelSet, len(q.externalLabels))
for k, v := range q.externalLabels {
el[k] = v
}
for _, m := range matchers {
if _, ok := el[model.LabelName(m.Name)]; ok {
delete(el, model.LabelName(m.Name))
}
}
for k, v := range el {
m, err := labels.NewMatcher(labels.MatchEqual, string(k), string(v))
if err != nil {
panic(err)
}
matchers = append(matchers, m)
}
return matchers, el
}
func removeLabels(l labels.Labels, toDelete model.LabelSet) {
for i := 0; i < len(l); {
if _, ok := toDelete[model.LabelName(l[i].Name)]; ok {
l = l[:i+copy(l[i:], l[i+1:])]
} else {
i++
}
}
}

View File

@ -0,0 +1,94 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"reflect"
"sort"
"testing"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
)
func mustNewLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher {
m, err := labels.NewMatcher(mt, name, val)
if err != nil {
panic(err)
}
return m
}
func TestAddExternalLabels(t *testing.T) {
tests := []struct {
el model.LabelSet
inMatchers []*labels.Matcher
outMatchers []*labels.Matcher
added model.LabelSet
}{
{
el: model.LabelSet{},
inMatchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"),
},
outMatchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"),
},
added: model.LabelSet{},
},
{
el: model.LabelSet{"region": "europe", "dc": "berlin-01"},
inMatchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"),
},
outMatchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"),
mustNewLabelMatcher(labels.MatchEqual, "region", "europe"),
mustNewLabelMatcher(labels.MatchEqual, "dc", "berlin-01"),
},
added: model.LabelSet{"region": "europe", "dc": "berlin-01"},
},
{
el: model.LabelSet{"region": "europe", "dc": "berlin-01"},
inMatchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"),
mustNewLabelMatcher(labels.MatchEqual, "dc", "munich-02"),
},
outMatchers: []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"),
mustNewLabelMatcher(labels.MatchEqual, "region", "europe"),
mustNewLabelMatcher(labels.MatchEqual, "dc", "munich-02"),
},
added: model.LabelSet{"region": "europe"},
},
}
for i, test := range tests {
q := querier{
externalLabels: test.el,
}
matchers, added := q.addExternalLabels(test.inMatchers)
sort.Slice(test.outMatchers, func(i, j int) bool { return test.outMatchers[i].Name < test.outMatchers[j].Name })
sort.Slice(matchers, func(i, j int) bool { return matchers[i].Name < matchers[j].Name })
if !reflect.DeepEqual(matchers, test.outMatchers) {
t.Fatalf("%d. unexpected matchers; want %v, got %v", i, test.outMatchers, matchers)
}
if !reflect.DeepEqual(added, test.added) {
t.Fatalf("%d. unexpected added labels; want %v, got %v", i, test.added, added)
}
}
}

103
storage/remote/storage.go Normal file
View File

@ -0,0 +1,103 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.package remote
package remote
import (
"sync"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
)
// Storage represents all the remote read and write endpoints. It implements
// storage.Storage.
type Storage struct {
mtx sync.RWMutex
// For writes
queues []*QueueManager
// For reads
clients []*Client
externalLabels model.LabelSet
}
// ApplyConfig updates the state as the new config requires.
func (s *Storage) ApplyConfig(conf *config.Config) error {
s.mtx.Lock()
defer s.mtx.Unlock()
// Update write queues
newQueues := []*QueueManager{}
// TODO: we should only stop & recreate queues which have changes,
// as this can be quite disruptive.
for i, rwConf := range conf.RemoteWriteConfigs {
c, err := NewClient(i, &clientConfig{
url: rwConf.URL,
timeout: rwConf.RemoteTimeout,
httpClientConfig: rwConf.HTTPClientConfig,
})
if err != nil {
return err
}
newQueues = append(newQueues, NewQueueManager(
defaultQueueManagerConfig,
conf.GlobalConfig.ExternalLabels,
rwConf.WriteRelabelConfigs,
c,
))
}
for _, q := range s.queues {
q.Stop()
}
s.queues = newQueues
for _, q := range s.queues {
q.Start()
}
// Update read clients
clients := []*Client{}
for i, rrConf := range conf.RemoteReadConfigs {
c, err := NewClient(i, &clientConfig{
url: rrConf.URL,
timeout: rrConf.RemoteTimeout,
httpClientConfig: rrConf.HTTPClientConfig,
})
if err != nil {
return err
}
clients = append(clients, c)
}
s.clients = clients
s.externalLabels = conf.GlobalConfig.ExternalLabels
return nil
}
// Close the background processing of the storage queues.
func (s *Storage) Close() error {
s.mtx.Lock()
defer s.mtx.Unlock()
for _, q := range s.queues {
q.Stop()
}
return nil
}

58
storage/remote/write.go Normal file
View File

@ -0,0 +1,58 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
)
func (s *Storage) Appender() (storage.Appender, error) {
return s, nil
}
func (s *Storage) Add(l labels.Labels, t int64, v float64) (string, error) {
s.mtx.RLock()
defer s.mtx.RUnlock()
for _, q := range s.queues {
q.Append(&model.Sample{
Metric: labelsToMetric(l),
Timestamp: model.Time(t),
Value: model.SampleValue(v),
})
}
return "", nil
}
func labelsToMetric(ls labels.Labels) model.Metric {
metric := make(model.Metric, len(ls))
for _, l := range ls {
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}
return metric
}
func (s *Storage) AddFast(l labels.Labels, _ string, t int64, v float64) error {
_, err := s.Add(l, t, v)
return err
}
func (*Storage) Commit() error {
return nil
}
func (*Storage) Rollback() error {
return nil
}

View File

@ -135,7 +135,7 @@ func (a appender) Add(lset labels.Labels, t int64, v float64) (string, error) {
return ref, err
}
func (a appender) AddFast(ref string, t int64, v float64) error {
func (a appender) AddFast(_ labels.Labels, ref string, t int64, v float64) error {
err := a.a.AddFast(ref, t, v)
switch errors.Cause(err) {

27
vendor/golang.org/x/time/LICENSE generated vendored Normal file
View File

@ -0,0 +1,27 @@
Copyright (c) 2009 The Go Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

22
vendor/golang.org/x/time/PATENTS generated vendored Normal file
View File

@ -0,0 +1,22 @@
Additional IP Rights Grant (Patents)
"This implementation" means the copyrightable works distributed by
Google as part of the Go project.
Google hereby grants to You a perpetual, worldwide, non-exclusive,
no-charge, royalty-free, irrevocable (except as stated in this section)
patent license to make, have made, use, offer to sell, sell, import,
transfer and otherwise run, modify and propagate the contents of this
implementation of Go, where such license applies only to those patent
claims, both currently owned or controlled by Google and acquired in
the future, licensable by Google that are necessarily infringed by this
implementation of Go. This grant does not include claims that would be
infringed only as a consequence of further modification of this
implementation. If you or your agent or exclusive licensee institute or
order or agree to the institution of patent litigation against any
entity (including a cross-claim or counterclaim in a lawsuit) alleging
that this implementation of Go or any code incorporated within this
implementation of Go constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any patent
rights granted to you under this License for this implementation of Go
shall terminate as of the date such litigation is filed.

380
vendor/golang.org/x/time/rate/rate.go generated vendored Normal file
View File

@ -0,0 +1,380 @@
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package rate provides a rate limiter.
package rate
import (
"fmt"
"math"
"sync"
"time"
)
// Limit defines the maximum frequency of some events.
// Limit is represented as number of events per second.
// A zero Limit allows no events.
type Limit float64
// Inf is the infinite rate limit; it allows all events (even if burst is zero).
const Inf = Limit(math.MaxFloat64)
// Every converts a minimum time interval between events to a Limit.
func Every(interval time.Duration) Limit {
if interval <= 0 {
return Inf
}
return 1 / Limit(interval.Seconds())
}
// A Limiter controls how frequently events are allowed to happen.
// It implements a "token bucket" of size b, initially full and refilled
// at rate r tokens per second.
// Informally, in any large enough time interval, the Limiter limits the
// rate to r tokens per second, with a maximum burst size of b events.
// As a special case, if r == Inf (the infinite rate), b is ignored.
// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
//
// The zero value is a valid Limiter, but it will reject all events.
// Use NewLimiter to create non-zero Limiters.
//
// Limiter has three main methods, Allow, Reserve, and Wait.
// Most callers should use Wait.
//
// Each of the three methods consumes a single token.
// They differ in their behavior when no token is available.
// If no token is available, Allow returns false.
// If no token is available, Reserve returns a reservation for a future token
// and the amount of time the caller must wait before using it.
// If no token is available, Wait blocks until one can be obtained
// or its associated context.Context is canceled.
//
// The methods AllowN, ReserveN, and WaitN consume n tokens.
type Limiter struct {
limit Limit
burst int
mu sync.Mutex
tokens float64
// last is the last time the limiter's tokens field was updated
last time.Time
// lastEvent is the latest time of a rate-limited event (past or future)
lastEvent time.Time
}
// Limit returns the maximum overall event rate.
func (lim *Limiter) Limit() Limit {
lim.mu.Lock()
defer lim.mu.Unlock()
return lim.limit
}
// Burst returns the maximum burst size. Burst is the maximum number of tokens
// that can be consumed in a single call to Allow, Reserve, or Wait, so higher
// Burst values allow more events to happen at once.
// A zero Burst allows no events, unless limit == Inf.
func (lim *Limiter) Burst() int {
return lim.burst
}
// NewLimiter returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{
limit: r,
burst: b,
}
}
// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
return lim.AllowN(time.Now(), 1)
}
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n, 0).ok
}
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Reservation struct {
ok bool
lim *Limiter
tokens int
timeToAct time.Time
// This is the Limit at reservation time, it can change later.
limit Limit
}
// OK returns whether the limiter can provide the requested number of tokens
// within the maximum wait time. If OK is false, Delay returns InfDuration, and
// Cancel does nothing.
func (r *Reservation) OK() bool {
return r.ok
}
// Delay is shorthand for DelayFrom(time.Now()).
func (r *Reservation) Delay() time.Duration {
return r.DelayFrom(time.Now())
}
// InfDuration is the duration returned by Delay when a Reservation is not OK.
const InfDuration = time.Duration(1<<63 - 1)
// DelayFrom returns the duration for which the reservation holder must wait
// before taking the reserved action. Zero duration means act immediately.
// InfDuration means the limiter cannot grant the tokens requested in this
// Reservation within the maximum wait time.
func (r *Reservation) DelayFrom(now time.Time) time.Duration {
if !r.ok {
return InfDuration
}
delay := r.timeToAct.Sub(now)
if delay < 0 {
return 0
}
return delay
}
// Cancel is shorthand for CancelAt(time.Now()).
func (r *Reservation) Cancel() {
r.CancelAt(time.Now())
return
}
// CancelAt indicates that the reservation holder will not perform the reserved action
// and reverses the effects of this Reservation on the rate limit as much as possible,
// considering that other reservations may have already been made.
func (r *Reservation) CancelAt(now time.Time) {
if !r.ok {
return
}
r.lim.mu.Lock()
defer r.lim.mu.Unlock()
if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
return
}
// calculate tokens to restore
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
// after r was obtained. These tokens should not be restored.
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
if restoreTokens <= 0 {
return
}
// advance time to now
now, _, tokens := r.lim.advance(now)
// calculate new number of tokens
tokens += restoreTokens
if burst := float64(r.lim.burst); tokens > burst {
tokens = burst
}
// update state
r.lim.last = now
r.lim.tokens = tokens
if r.timeToAct == r.lim.lastEvent {
prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
if !prevEvent.Before(now) {
r.lim.lastEvent = prevEvent
}
}
return
}
// Reserve is shorthand for ReserveN(time.Now(), 1).
func (lim *Limiter) Reserve() *Reservation {
return lim.ReserveN(time.Now(), 1)
}
// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
// The Limiter takes this Reservation into account when allowing future events.
// ReserveN returns false if n exceeds the Limiter's burst size.
// Usage example:
// r := lim.ReserveN(time.Now(), 1)
// if !r.OK() {
// // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
// return
// }
// time.Sleep(r.Delay())
// Act()
// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
// If you need to respect a deadline or cancel the delay, use Wait instead.
// To drop or skip events exceeding rate limit, use Allow instead.
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation {
r := lim.reserveN(now, n, InfDuration)
return &r
}
// contextContext is a temporary(?) copy of the context.Context type
// to support both Go 1.6 using golang.org/x/net/context and Go 1.7+
// with the built-in context package. If people ever stop using Go 1.6
// we can remove this.
type contextContext interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
// Wait is shorthand for WaitN(ctx, 1).
func (lim *Limiter) wait(ctx contextContext) (err error) {
return lim.WaitN(ctx, 1)
}
// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
// The burst limit is ignored if the rate limit is Inf.
func (lim *Limiter) waitN(ctx contextContext, n int) (err error) {
if n > lim.burst && lim.limit != Inf {
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
}
// Check if ctx is already cancelled
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Determine wait limit
now := time.Now()
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(now)
}
// Reserve
r := lim.reserveN(now, n, waitLimit)
if !r.ok {
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// Wait
t := time.NewTimer(r.DelayFrom(now))
defer t.Stop()
select {
case <-t.C:
// We can proceed.
return nil
case <-ctx.Done():
// Context was canceled before we could proceed. Cancel the
// reservation, which may permit other events to proceed sooner.
r.Cancel()
return ctx.Err()
}
}
// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit).
func (lim *Limiter) SetLimit(newLimit Limit) {
lim.SetLimitAt(time.Now(), newLimit)
}
// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated
// or underutilized by those which reserved (using Reserve or Wait) but did not yet act
// before SetLimitAt was called.
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) {
lim.mu.Lock()
defer lim.mu.Unlock()
now, _, tokens := lim.advance(now)
lim.last = now
lim.tokens = tokens
lim.limit = newLimit
}
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now
}
// Avoid making delta overflow below when last is very old.
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
if elapsed > maxElapsed {
elapsed = maxElapsed
}
// Calculate the new number of tokens, due to time that passed.
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
// durationFromTokens is a unit conversion function from the number of tokens to the duration
// of time it takes to accumulate them at a rate of limit tokens per second.
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
seconds := tokens / float64(limit)
return time.Nanosecond * time.Duration(1e9*seconds)
}
// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
// which could be accumulated during that duration at a rate of limit tokens per second.
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
return d.Seconds() * float64(limit)
}

21
vendor/golang.org/x/time/rate/rate_go16.go generated vendored Normal file
View File

@ -0,0 +1,21 @@
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.7
package rate
import "golang.org/x/net/context"
// Wait is shorthand for WaitN(ctx, 1).
func (lim *Limiter) Wait(ctx context.Context) (err error) {
return lim.waitN(ctx, 1)
}
// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
return lim.waitN(ctx, n)
}

21
vendor/golang.org/x/time/rate/rate_go17.go generated vendored Normal file
View File

@ -0,0 +1,21 @@
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.7
package rate
import "context"
// Wait is shorthand for WaitN(ctx, 1).
func (lim *Limiter) Wait(ctx context.Context) (err error) {
return lim.waitN(ctx, 1)
}
// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
return lim.waitN(ctx, n)
}

6
vendor/vendor.json vendored
View File

@ -1097,6 +1097,12 @@
"revision": "c589d0c9f0d81640c518354c7bcae77d99820aa3",
"revisionTime": "2016-09-30T00:14:02Z"
},
{
"checksumSHA1": "vGfePfr0+weQUeTM/71mu+LCFuE=",
"path": "golang.org/x/time/rate",
"revision": "8be79e1e0910c292df4e79c241bb7e8f7e725959",
"revisionTime": "2017-04-24T23:28:54Z"
},
{
"checksumSHA1": "AjdmRXf0fiy6Bec9mNlsGsmZi1k=",
"path": "google.golang.org/api/compute/v1",