Browse Source

Reduce memory usage of remote read by reducing pointer usage. (#4655)

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
pull/4386/head
Tom Wilkie 6 years ago committed by GitHub
parent
commit
d3a1ff1abf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      documentation/examples/remote_storage/remote_storage_adapter/influxdb/client.go
  2. 58
      prompb/types.pb.go
  3. 2
      prompb/types.proto
  4. 8
      storage/remote/codec.go
  5. 4
      storage/remote/codec_test.go
  6. 14
      storage/remote/queue_manager_test.go
  7. 4
      storage/remote/read_test.go
  8. 5
      web/api/v1/api_test.go

10
documentation/examples/remote_storage/remote_storage_adapter/influxdb/client.go

@ -248,8 +248,8 @@ func tagsToLabelPairs(name string, tags map[string]string) []*prompb.Label {
return pairs
}
func valuesToSamples(values [][]interface{}) ([]*prompb.Sample, error) {
samples := make([]*prompb.Sample, 0, len(values))
func valuesToSamples(values [][]interface{}) ([]prompb.Sample, error) {
samples := make([]prompb.Sample, 0, len(values))
for _, v := range values {
if len(v) != 2 {
return nil, fmt.Errorf("bad sample tuple length, expected [<timestamp>, <value>], got %v", v)
@ -275,7 +275,7 @@ func valuesToSamples(values [][]interface{}) ([]*prompb.Sample, error) {
return nil, fmt.Errorf("unable to convert sample value to float64: %v", err)
}
samples = append(samples, &prompb.Sample{
samples = append(samples, prompb.Sample{
Timestamp: timestamp,
Value: value,
})
@ -285,8 +285,8 @@ func valuesToSamples(values [][]interface{}) ([]*prompb.Sample, error) {
// mergeSamples merges two lists of sample pairs and removes duplicate
// timestamps. It assumes that both lists are sorted by timestamp.
func mergeSamples(a, b []*prompb.Sample) []*prompb.Sample {
result := make([]*prompb.Sample, 0, len(a)+len(b))
func mergeSamples(a, b []prompb.Sample) []prompb.Sample {
result := make([]prompb.Sample, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i].Timestamp < b[j].Timestamp {

58
prompb/types.pb.go

@ -66,8 +66,8 @@ func (m *Sample) GetTimestamp() int64 {
}
type TimeSeries struct {
Labels []*Label `protobuf:"bytes,1,rep,name=labels" json:"labels,omitempty"`
Samples []*Sample `protobuf:"bytes,2,rep,name=samples" json:"samples,omitempty"`
Labels []*Label `protobuf:"bytes,1,rep,name=labels" json:"labels,omitempty"`
Samples []Sample `protobuf:"bytes,2,rep,name=samples" json:"samples"`
}
func (m *TimeSeries) Reset() { *m = TimeSeries{} }
@ -82,7 +82,7 @@ func (m *TimeSeries) GetLabels() []*Label {
return nil
}
func (m *TimeSeries) GetSamples() []*Sample {
func (m *TimeSeries) GetSamples() []Sample {
if m != nil {
return m.Samples
}
@ -720,7 +720,7 @@ func (m *TimeSeries) Unmarshal(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Samples = append(m.Samples, &Sample{})
m.Samples = append(m.Samples, Sample{})
if err := m.Samples[len(m.Samples)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
@ -1306,29 +1306,29 @@ var (
func init() { proto.RegisterFile("types.proto", fileDescriptorTypes) }
var fileDescriptorTypes = []byte{
// 380 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x92, 0xcd, 0x0a, 0xd3, 0x40,
0x14, 0x85, 0x3b, 0x49, 0x3a, 0xb1, 0xb7, 0x22, 0x71, 0xa8, 0x18, 0x45, 0x6b, 0xc9, 0x2a, 0x82,
0xa4, 0xb4, 0xae, 0x04, 0x57, 0x85, 0x80, 0x8b, 0x46, 0xe8, 0xb4, 0x2b, 0x37, 0x32, 0x6d, 0xaf,
0x6d, 0x24, 0x3f, 0x43, 0x66, 0x2a, 0xf4, 0x41, 0x7c, 0xa7, 0x2e, 0x7d, 0x02, 0x91, 0x3e, 0x89,
0xcc, 0xa4, 0xb5, 0x05, 0x05, 0x77, 0xf7, 0x9c, 0x7b, 0x2e, 0xe7, 0x4b, 0x18, 0xe8, 0xeb, 0xa3,
0x44, 0x95, 0xc8, 0xa6, 0xd6, 0x35, 0x03, 0xd9, 0xd4, 0x25, 0xea, 0x3d, 0x1e, 0xd4, 0xf3, 0xc1,
0xae, 0xde, 0xd5, 0xd6, 0x1e, 0x9b, 0xa9, 0x4d, 0x44, 0xef, 0x81, 0x2e, 0x45, 0x29, 0x0b, 0x64,
0x03, 0xe8, 0x7e, 0x13, 0xc5, 0x01, 0x43, 0x32, 0x22, 0x31, 0xe1, 0xad, 0x60, 0x2f, 0xa0, 0xa7,
0xf3, 0x12, 0x95, 0x16, 0xa5, 0x0c, 0x9d, 0x11, 0x89, 0x5d, 0x7e, 0x33, 0x22, 0x04, 0x58, 0xe5,
0x25, 0x2e, 0xb1, 0xc9, 0x51, 0xb1, 0xd7, 0x40, 0x0b, 0xb1, 0xc6, 0x42, 0x85, 0x64, 0xe4, 0xc6,
0xfd, 0xe9, 0xe3, 0xe4, 0x56, 0x9f, 0xcc, 0xcd, 0x86, 0x5f, 0x02, 0xec, 0x0d, 0xf8, 0xca, 0xd6,
0xaa, 0xd0, 0xb1, 0x59, 0x76, 0x9f, 0x6d, 0x89, 0xf8, 0x35, 0x12, 0x4d, 0xa0, 0x6b, 0xcf, 0x19,
0x03, 0xaf, 0x12, 0x65, 0x8b, 0xd8, 0xe3, 0x76, 0xbe, 0x71, 0x3b, 0xd6, 0x6c, 0x45, 0xf4, 0x0e,
0xe8, 0xbc, 0xad, 0x1a, 0xff, 0x97, 0x6a, 0xe6, 0x9d, 0x7e, 0xbe, 0xea, 0x5c, 0xd9, 0xa2, 0xef,
0x04, 0x1e, 0x5a, 0x3f, 0x13, 0x7a, 0xb3, 0xc7, 0x86, 0x4d, 0xc0, 0x33, 0x3f, 0xd5, 0xb6, 0x3e,
0x9a, 0xbe, 0xfc, 0xeb, 0xfe, 0x92, 0x4b, 0x56, 0x47, 0x89, 0xdc, 0x46, 0xff, 0x80, 0x3a, 0xff,
0x02, 0x75, 0xef, 0x41, 0x63, 0xf0, 0xcc, 0x1d, 0xa3, 0xe0, 0xa4, 0x8b, 0xa0, 0xc3, 0x7c, 0x70,
0x3f, 0xa6, 0x8b, 0x80, 0x18, 0x83, 0xa7, 0x81, 0x63, 0x0d, 0x9e, 0x06, 0x6e, 0xf4, 0x15, 0x7a,
0x1c, 0xc5, 0xf6, 0x43, 0x5e, 0x69, 0xc5, 0x9e, 0x82, 0xaf, 0x34, 0xca, 0xcf, 0xa5, 0xb2, 0x58,
0x2e, 0xa7, 0x46, 0x66, 0xca, 0x34, 0x7f, 0x39, 0x54, 0x9b, 0x6b, 0xb3, 0x99, 0xd9, 0x33, 0x78,
0xa0, 0xb4, 0x68, 0xb4, 0x49, 0xbb, 0x36, 0xed, 0x5b, 0x9d, 0x29, 0xf6, 0x04, 0x28, 0x56, 0x5b,
0xb3, 0xf0, 0xec, 0xa2, 0x8b, 0xd5, 0x36, 0x53, 0xb3, 0xc1, 0xe9, 0x3c, 0x24, 0x3f, 0xce, 0x43,
0xf2, 0xeb, 0x3c, 0x24, 0x9f, 0xa8, 0xf9, 0x62, 0xb9, 0x5e, 0x53, 0xfb, 0x66, 0xde, 0xfe, 0x0e,
0x00, 0x00, 0xff, 0xff, 0x32, 0x7f, 0x5e, 0x49, 0x64, 0x02, 0x00, 0x00,
// 383 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x92, 0xcf, 0xca, 0xd3, 0x40,
0x14, 0xc5, 0xbf, 0x49, 0xd2, 0x89, 0xbd, 0x9f, 0x48, 0x1c, 0x2a, 0x46, 0xd1, 0x5a, 0xb2, 0x8a,
0x9b, 0x94, 0xd6, 0x95, 0xe0, 0xaa, 0x10, 0x70, 0xd1, 0x08, 0x9d, 0x76, 0xe5, 0x46, 0xa6, 0xed,
0xb5, 0x8d, 0xe6, 0xcf, 0x90, 0x99, 0x0a, 0x7d, 0x10, 0xdf, 0xa9, 0x4b, 0x9f, 0x40, 0xa4, 0x4f,
0x22, 0x33, 0x49, 0x6d, 0x41, 0xc1, 0xdd, 0x3d, 0xe7, 0xfe, 0x6e, 0xce, 0x49, 0x08, 0xdc, 0xeb,
0xa3, 0x44, 0x95, 0xc8, 0xa6, 0xd6, 0x35, 0x03, 0xd9, 0xd4, 0x25, 0xea, 0x3d, 0x1e, 0xd4, 0xf3,
0xc1, 0xae, 0xde, 0xd5, 0xd6, 0x1e, 0x9b, 0xa9, 0x25, 0xa2, 0x77, 0x40, 0x97, 0xa2, 0x94, 0x05,
0xb2, 0x01, 0xf4, 0xbe, 0x89, 0xe2, 0x80, 0x21, 0x19, 0x91, 0x98, 0xf0, 0x56, 0xb0, 0x17, 0xd0,
0xd7, 0x79, 0x89, 0x4a, 0x8b, 0x52, 0x86, 0xce, 0x88, 0xc4, 0x2e, 0xbf, 0x1a, 0xd1, 0x57, 0x80,
0x55, 0x5e, 0xe2, 0x12, 0x9b, 0x1c, 0x15, 0x7b, 0x0d, 0xb4, 0x10, 0x6b, 0x2c, 0x54, 0x48, 0x46,
0x6e, 0x7c, 0x3f, 0x7d, 0x9c, 0x5c, 0xe3, 0x93, 0xb9, 0xd9, 0xf0, 0x0e, 0x60, 0x53, 0xf0, 0x95,
0x8d, 0x55, 0xa1, 0x63, 0x59, 0x76, 0xcb, 0xb6, 0x8d, 0x66, 0xde, 0xe9, 0xe7, 0xab, 0x3b, 0x7e,
0x01, 0xa3, 0x09, 0xf4, 0xec, 0x43, 0x18, 0x03, 0xaf, 0x12, 0x65, 0x5b, 0xb4, 0xcf, 0xed, 0x7c,
0x6d, 0xef, 0x58, 0xb3, 0x15, 0xd1, 0x5b, 0xa0, 0xf3, 0x36, 0x70, 0xfc, 0xdf, 0x6e, 0x5d, 0x5c,
0x87, 0x45, 0xdf, 0x09, 0x3c, 0xb4, 0x7e, 0x26, 0xf4, 0x66, 0x8f, 0x0d, 0x9b, 0x80, 0x67, 0x3e,
0xad, 0x4d, 0x7d, 0x34, 0x7d, 0xf9, 0xd7, 0x7d, 0xc7, 0x25, 0xab, 0xa3, 0x44, 0x6e, 0xd1, 0x3f,
0x45, 0x9d, 0x7f, 0x15, 0x75, 0x6f, 0x8b, 0xc6, 0xe0, 0x99, 0x3b, 0x46, 0xc1, 0x49, 0x17, 0xc1,
0x1d, 0xf3, 0xc1, 0xfd, 0x90, 0x2e, 0x02, 0x62, 0x0c, 0x9e, 0x06, 0x8e, 0x35, 0x78, 0x1a, 0xb8,
0xd1, 0x17, 0xe8, 0x73, 0x14, 0xdb, 0xf7, 0x79, 0xa5, 0x15, 0x7b, 0x0a, 0xbe, 0xd2, 0x28, 0x3f,
0x95, 0xca, 0xd6, 0x72, 0x39, 0x35, 0x32, 0x53, 0x26, 0xf9, 0xf3, 0xa1, 0xda, 0x5c, 0x92, 0xcd,
0xcc, 0x9e, 0xc1, 0x03, 0xa5, 0x45, 0xa3, 0x0d, 0xed, 0x5a, 0xda, 0xb7, 0x3a, 0x53, 0xec, 0x09,
0x50, 0xac, 0xb6, 0x66, 0xe1, 0xd9, 0x45, 0x0f, 0xab, 0x6d, 0xa6, 0x66, 0x83, 0xd3, 0x79, 0x48,
0x7e, 0x9c, 0x87, 0xe4, 0xd7, 0x79, 0x48, 0x3e, 0x52, 0xf3, 0xc6, 0x72, 0xbd, 0xa6, 0xf6, 0xcf,
0x79, 0xf3, 0x3b, 0x00, 0x00, 0xff, 0xff, 0x44, 0x38, 0x1e, 0x5b, 0x6a, 0x02, 0x00, 0x00,
}

2
prompb/types.proto

@ -25,7 +25,7 @@ message Sample {
message TimeSeries {
repeated Label labels = 1;
repeated Sample samples = 2;
repeated Sample samples = 2 [(gogoproto.nullable) = false];
}
message Label {

8
storage/remote/codec.go

@ -89,7 +89,7 @@ func ToWriteRequest(samples []*model.Sample) *prompb.WriteRequest {
for _, s := range samples {
ts := prompb.TimeSeries{
Labels: MetricToLabelProtos(s.Metric),
Samples: []*prompb.Sample{
Samples: []prompb.Sample{
{
Value: float64(s.Value),
Timestamp: int64(s.Timestamp),
@ -153,7 +153,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
for ss.Next() {
series := ss.At()
iter := series.Iterator()
samples := []*prompb.Sample{}
samples := []prompb.Sample{}
for iter.Next() {
numSamples++
@ -164,7 +164,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
}
}
ts, val := iter.At()
samples = append(samples, &prompb.Sample{
samples = append(samples, prompb.Sample{
Timestamp: ts,
Value: val,
})
@ -249,7 +249,7 @@ func (c *concreteSeriesSet) Err() error {
// concreteSeries implements storage.Series.
type concreteSeries struct {
labels labels.Labels
samples []*prompb.Sample
samples []prompb.Sample
}
func (c *concreteSeries) Labels() labels.Labels {

4
storage/remote/codec_test.go

@ -101,11 +101,11 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
func TestConcreteSeriesSet(t *testing.T) {
series1 := &concreteSeries{
labels: labels.FromStrings("foo", "bar"),
samples: []*prompb.Sample{&prompb.Sample{Value: 1, Timestamp: 2}},
samples: []prompb.Sample{prompb.Sample{Value: 1, Timestamp: 2}},
}
series2 := &concreteSeries{
labels: labels.FromStrings("foo", "baz"),
samples: []*prompb.Sample{&prompb.Sample{Value: 3, Timestamp: 4}},
samples: []prompb.Sample{prompb.Sample{Value: 3, Timestamp: 4}},
}
c := &concreteSeriesSet{
series: []storage.Series{series1, series2},

14
storage/remote/queue_manager_test.go

@ -30,16 +30,16 @@ import (
const defaultFlushDeadline = 1 * time.Minute
type TestStorageClient struct {
receivedSamples map[string][]*prompb.Sample
expectedSamples map[string][]*prompb.Sample
receivedSamples map[string][]prompb.Sample
expectedSamples map[string][]prompb.Sample
wg sync.WaitGroup
mtx sync.Mutex
}
func NewTestStorageClient() *TestStorageClient {
return &TestStorageClient{
receivedSamples: map[string][]*prompb.Sample{},
expectedSamples: map[string][]*prompb.Sample{},
receivedSamples: map[string][]prompb.Sample{},
expectedSamples: map[string][]prompb.Sample{},
}
}
@ -47,12 +47,12 @@ func (c *TestStorageClient) expectSamples(ss model.Samples) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.expectedSamples = map[string][]*prompb.Sample{}
c.receivedSamples = map[string][]*prompb.Sample{}
c.expectedSamples = map[string][]prompb.Sample{}
c.receivedSamples = map[string][]prompb.Sample{}
for _, s := range ss {
ts := labelProtosToLabels(MetricToLabelProtos(s.Metric)).String()
c.expectedSamples[ts] = append(c.expectedSamples[ts], &prompb.Sample{
c.expectedSamples[ts] = append(c.expectedSamples[ts], prompb.Sample{
Timestamp: int64(s.Timestamp),
Value: float64(s.Value),
})

4
storage/remote/read_test.go

@ -122,12 +122,12 @@ func TestSeriesSetFilter(t *testing.T) {
toRemove: model.LabelSet{"foo": "bar"},
in: &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
{Labels: labelsToLabelsProto(labels.FromStrings("foo", "bar", "a", "b")), Samples: []*prompb.Sample{}},
{Labels: labelsToLabelsProto(labels.FromStrings("foo", "bar", "a", "b")), Samples: []prompb.Sample{}},
},
},
expected: &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{
{Labels: labelsToLabelsProto(labels.FromStrings("a", "b")), Samples: []*prompb.Sample{}},
{Labels: labelsToLabelsProto(labels.FromStrings("a", "b")), Samples: []prompb.Sample{}},
},
},
},

5
web/api/v1/api_test.go

@ -19,7 +19,6 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/go-kit/kit/log"
"io/ioutil"
"math"
"net/http"
@ -30,6 +29,8 @@ import (
"testing"
"time"
"github.com/go-kit/kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
config_util "github.com/prometheus/common/config"
@ -897,7 +898,7 @@ func TestReadEndpoint(t *testing.T) {
{Name: "d", Value: "e"},
{Name: "foo", Value: "bar"},
},
Samples: []*prompb.Sample{{Value: 1, Timestamp: 0}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
},
},
}

Loading…
Cancel
Save