Browse Source

prompb: Modify layout of histograms

Note: This is deliberately an incompatible change. Since we have never
used histograms in remote read/write yet, there is no point in keeping
compatibility. This _is_, however, compatible to the state in the main
branch.

This commit flattens the bucket message into top-level fields. This
has the disadvantage of now having two triples of fields prefixed with
`negative_...` or `positive_...`. However, with this structure, we
save one tag on the wire. And, perhaps more importantly, we mirror the
structure of the `histogram.Histogram` Go type.

This commit also adjusts `repeated` fields to use names in the plural
form, as it is also the case for the fields that already existed.

This also adds a doc comment to `HistogramProtoToHistogram` and
changes its return type to a pointer (which is more convenient and
probably more efficient).

Signed-off-by: beorn7 <beorn@grafana.com>
pull/11011/head
beorn7 2 years ago
parent
commit
87351f2318
  1. 2
      documentation/examples/remote_storage/example_write_adapter/server.go
  2. 858
      prompb/types.pb.go
  3. 54
      prompb/types.proto
  4. 60
      storage/remote/codec.go
  5. 2
      storage/remote/write_handler.go
  6. 2
      storage/remote/write_handler_test.go

2
documentation/examples/remote_storage/example_write_adapter/server.go

@ -52,7 +52,7 @@ func main() {
for _, hp := range ts.Histograms {
h := remote.HistogramProtoToHistogram(hp)
fmt.Printf("\tHistogram: %s\n", (&h).String())
fmt.Printf("\tHistogram: %s\n", h.String())
}
}
})

858
prompb/types.pb.go

File diff suppressed because it is too large Load Diff

54
prompb/types.proto

@ -86,40 +86,46 @@ message Histogram {
uint64 zero_count_int = 6;
double zero_count_float = 7;
}
Buckets negative_buckets = 8;
Buckets positive_buckets = 9;
ResetHint reset_hint = 10;
// Negative Buckets.
repeated BucketSpan negative_spans = 8;
// Use either "negative_deltas" or "negative_counts", the former for
// regular histograms with integer counts, the latter for float
// histograms.
repeated sint64 negative_deltas = 9; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
repeated double negative_counts = 10; // Absolute count of each bucket.
// Positive Buckets.
repeated BucketSpan positive_spans = 11;
// Use either "positive_deltas" or "positive_counts", the former for
// regular histograms with integer counts, the latter for float
// histograms.
repeated sint64 positive_deltas = 12; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
repeated double positive_counts = 13; // Absolute count of each bucket.
ResetHint reset_hint = 14;
// timestamp is in ms format, see model/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
int64 timestamp = 11;
int64 timestamp = 15;
}
// Sparse buckets.
message Buckets {
// A Span is a given number of consecutive buckets at a given
// offset. Logically, it would be more straightforward to include
// the bucket counts in the Span. However, the protobuf
// representation is more compact in the way the data is structured
// here (with all the buckets in a single array separate from the
// Spans).
message Span {
sint32 offset = 1; // Gap to previous span, or starting point for 1st span (which can be negative).
uint32 length = 2; // Length of consecutive buckets.
}
repeated Span span = 1;
// Only one of "delta" or "count" may be used, the former for regular
// histograms with integer counts, the latter for float histograms.
repeated sint64 delta = 2; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
repeated double count = 3; // Absolute count of each bucket.
// A BucketSpan defines a number of consecutive buckets with their
// offset. Logically, it would be more straightforward to include the
// bucket counts in the Span. However, the protobuf representation is
// more compact in the way the data is structured here (with all the
// buckets in a single array separate from the Spans).
message BucketSpan {
sint32 offset = 1; // Gap to previous span, or starting point for 1st span (which can be negative).
uint32 length = 2; // Length of consecutive buckets.
}
// TimeSeries represents samples and labels for a single time series.
message TimeSeries {
// For a timeseries to be valid, and for the samples and exemplars
// to be ingested by the remote system properly, the labels field is required.
repeated Label labels = 1 [(gogoproto.nullable) = false];
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Label labels = 1 [(gogoproto.nullable) = false];
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
}

60
storage/remote/codec.go

@ -502,28 +502,24 @@ func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar {
}
}
func HistogramProtoToHistogram(hp prompb.Histogram) histogram.Histogram {
h := histogram.Histogram{
// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an interger histogram and not a float histogram.
func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram {
return &histogram.Histogram{
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveBuckets: hp.PositiveBuckets.GetDelta(),
NegativeBuckets: hp.NegativeBuckets.GetDelta(),
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
if hp.PositiveBuckets != nil {
h.PositiveSpans = spansProtoToSpans(hp.PositiveBuckets.Span)
}
if hp.NegativeBuckets != nil {
h.NegativeSpans = spansProtoToSpans(hp.NegativeBuckets.Span)
}
return h
}
func spansProtoToSpans(s []*prompb.Buckets_Span) []histogram.Span {
func spansProtoToSpans(s []*prompb.BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
@ -534,27 +530,23 @@ func spansProtoToSpans(s []*prompb.Buckets_Span) []histogram.Span {
func histogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.Histogram {
return prompb.Histogram{
Count: &prompb.Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeBuckets: &prompb.Buckets{
Span: spansToSpansProto(h.NegativeSpans),
Delta: h.NegativeBuckets,
},
PositiveBuckets: &prompb.Buckets{
Span: spansToSpansProto(h.PositiveSpans),
Delta: h.PositiveBuckets,
},
Timestamp: timestamp,
}
}
func spansToSpansProto(s []histogram.Span) []*prompb.Buckets_Span {
spans := make([]*prompb.Buckets_Span, len(s))
Count: &prompb.Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeSpans: spansToSpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets,
PositiveSpans: spansToSpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets,
Timestamp: timestamp,
}
}
func spansToSpansProto(s []histogram.Span) []*prompb.BucketSpan {
spans := make([]*prompb.BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = &prompb.Buckets_Span{Offset: s[i].Offset, Length: s[i].Length}
spans[i] = &prompb.BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}
return spans

2
storage/remote/write_handler.go

@ -120,7 +120,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
for _, hp := range ts.Histograms {
hs := HistogramProtoToHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, &hs)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, hs)
if err != nil {
unwrappedErr := errors.Unwrap(err)
// Althogh AppendHistogram does not currently return ErrDuplicateSampleForTimestamp there is

2
storage/remote/write_handler_test.go

@ -66,7 +66,7 @@ func TestRemoteWriteHandler(t *testing.T) {
for _, hp := range ts.Histograms {
h := HistogramProtoToHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, &h}, appendable.histograms[k])
require.Equal(t, mockHistogram{labels, hp.Timestamp, h}, appendable.histograms[k])
k++
}
}

Loading…
Cancel
Save