mirror of https://github.com/prometheus/prometheus
parent
a1087ed37a
commit
518b77c59d
|
@ -72,7 +72,6 @@ const ()
|
||||||
// TODO zerothreshold
|
// TODO zerothreshold
|
||||||
// TODO: encode schema and spans metadata in the chunk
|
// TODO: encode schema and spans metadata in the chunk
|
||||||
// TODO: decode-recode chunk when new spans appear
|
// TODO: decode-recode chunk when new spans appear
|
||||||
|
|
||||||
type HistoChunk struct {
|
type HistoChunk struct {
|
||||||
b bstream
|
b bstream
|
||||||
}
|
}
|
||||||
|
@ -193,12 +192,14 @@ func (c *HistoChunk) Iterator(it Iterator) Iterator {
|
||||||
type histoAppender struct {
|
type histoAppender struct {
|
||||||
b *bstream
|
b *bstream
|
||||||
|
|
||||||
// Meta
|
// Metadata:
|
||||||
schema int32
|
schema int32
|
||||||
posSpans, negSpans []histogram.Span
|
posSpans, negSpans []histogram.Span
|
||||||
|
|
||||||
// for the fields that are tracked as dod's
|
// For the fields that are tracked as dod's.
|
||||||
// note that we expect to handle negative deltas (e.g. resets) by creating new chunks, we still want to support it in general hence signed integer types
|
// Note that we expect to handle negative deltas (e.g. resets) by
|
||||||
|
// creating new chunks, we still want to support it in general hence
|
||||||
|
// signed integer types.
|
||||||
t int64
|
t int64
|
||||||
cnt, zcnt uint64
|
cnt, zcnt uint64
|
||||||
tDelta, cntDelta, zcntDelta int64
|
tDelta, cntDelta, zcntDelta int64
|
||||||
|
@ -206,12 +207,12 @@ type histoAppender struct {
|
||||||
posbuckets, negbuckets []int64
|
posbuckets, negbuckets []int64
|
||||||
posbucketsDelta, negbucketsDelta []int64
|
posbucketsDelta, negbucketsDelta []int64
|
||||||
|
|
||||||
// for the fields that are gorilla xor coded
|
// The sum is Gorilla xor encoded.
|
||||||
sum float64
|
sum float64
|
||||||
leading uint8
|
leading uint8
|
||||||
trailing uint8
|
trailing uint8
|
||||||
|
|
||||||
buf64 []byte // for working on varint64's
|
buf64 []byte // For working on varint64's.
|
||||||
}
|
}
|
||||||
|
|
||||||
func putVarint(b *bstream, buf []byte, x int64) {
|
func putVarint(b *bstream, buf []byte, x int64) {
|
||||||
|
@ -227,16 +228,18 @@ func putUvarint(b *bstream, buf []byte, x uint64) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *histoAppender) Append(int64, float64) {
|
func (a *histoAppender) Append(int64, float64) {
|
||||||
panic("cannot call histoAppender.Append().")
|
panic("cannot call histoAppender.Append()")
|
||||||
}
|
}
|
||||||
|
|
||||||
// AppendHistogram appends a SparseHistogram to the chunk
|
// AppendHistogram appends a SparseHistogram to the chunk. We assume the
|
||||||
// we assume the histogram is properly structured. E.g. that the number pos/neg buckets used corresponds to the number conveyed by the pos/neg span structures
|
// histogram is properly structured. E.g. that the number of pos/neg buckets
|
||||||
|
// used corresponds to the number conveyed by the pos/neg span structures.
|
||||||
func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
|
func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
|
||||||
var tDelta, cntDelta, zcntDelta int64
|
var tDelta, cntDelta, zcntDelta int64
|
||||||
num := binary.BigEndian.Uint16(a.b.bytes())
|
num := binary.BigEndian.Uint16(a.b.bytes())
|
||||||
|
|
||||||
if num == 0 {
|
switch num {
|
||||||
|
case 0:
|
||||||
// the first append gets the privilege to dictate the metadata
|
// the first append gets the privilege to dictate the metadata
|
||||||
// but it's also responsible for encoding it into the chunk!
|
// but it's also responsible for encoding it into the chunk!
|
||||||
|
|
||||||
|
@ -260,7 +263,7 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
|
||||||
for _, buck := range h.NegativeBuckets {
|
for _, buck := range h.NegativeBuckets {
|
||||||
putVarint(a.b, a.buf64, buck)
|
putVarint(a.b, a.buf64, buck)
|
||||||
}
|
}
|
||||||
} else if num == 1 {
|
case 1:
|
||||||
tDelta = t - a.t
|
tDelta = t - a.t
|
||||||
cntDelta = int64(h.Count) - int64(a.cnt)
|
cntDelta = int64(h.Count) - int64(a.cnt)
|
||||||
zcntDelta = int64(h.ZeroCount) - int64(a.zcnt)
|
zcntDelta = int64(h.ZeroCount) - int64(a.zcnt)
|
||||||
|
@ -281,7 +284,7 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
|
||||||
putVarint(a.b, a.buf64, delta)
|
putVarint(a.b, a.buf64, delta)
|
||||||
a.negbucketsDelta[i] = delta
|
a.negbucketsDelta[i] = delta
|
||||||
}
|
}
|
||||||
} else {
|
default:
|
||||||
tDelta = t - a.t
|
tDelta = t - a.t
|
||||||
cntDelta = int64(h.Count) - int64(a.cnt)
|
cntDelta = int64(h.Count) - int64(a.cnt)
|
||||||
zcntDelta = int64(h.ZeroCount) - int64(a.zcnt)
|
zcntDelta = int64(h.ZeroCount) - int64(a.zcnt)
|
||||||
|
|
|
@ -43,8 +43,13 @@
|
||||||
|
|
||||||
package chunkenc
|
package chunkenc
|
||||||
|
|
||||||
// putInt64VBBucket writes an int64 using varbit optimized for SHS buckets
|
// putInt64VBBucket writes an int64 using varbit optimized for SHS buckets.
|
||||||
// note: we could improve this further: each branch doesn't need to support any values of any of the prior branches, so we can expand the range of each branch. do more with fewer bits
|
//
|
||||||
|
// TODO(Dieterbe): We could improve this further: Each branch doesn't need to
|
||||||
|
// support any values of any of the prior branches. So we can expand the range
|
||||||
|
// of each branch. Do more with fewer bits. It comes at the price of more
|
||||||
|
// expensive encoding and decoding (cutting out and later adding back that
|
||||||
|
// center-piece we skip).
|
||||||
func putInt64VBBucket(b *bstream, val int64) {
|
func putInt64VBBucket(b *bstream, val int64) {
|
||||||
switch {
|
switch {
|
||||||
case val == 0:
|
case val == 0:
|
||||||
|
|
Loading…
Reference in New Issue