Merge pull request #9439 from codesome/counterresetmeta

Add info about counter resets in chunk meta
pull/9481/head
Björn Rabenstein 3 years ago committed by GitHub
commit 96f0683318
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -77,7 +77,7 @@ type HistoChunk struct {
// NewHistoChunk returns a new chunk with Histo encoding of the given size.
func NewHistoChunk() *HistoChunk {
b := make([]byte, 2, 128)
b := make([]byte, 3, 128)
return &HistoChunk{b: bstream{stream: b, count: 0}}
}
@ -106,6 +106,32 @@ func (c *HistoChunk) Meta() (int32, float64, []histogram.Span, []histogram.Span,
return readHistoChunkMeta(&b)
}
// CounterResetHeader defines the first 2 bits of the chunk header.
type CounterResetHeader byte
const (
CounterReset CounterResetHeader = 0b10000000
NotCounterReset CounterResetHeader = 0b01000000
GaugeType CounterResetHeader = 0b11000000
UnknownCounterReset CounterResetHeader = 0b00000000
)
// SetCounterResetHeader sets the counter reset header.
func (c *HistoChunk) SetCounterResetHeader(h CounterResetHeader) {
switch h {
case CounterReset, NotCounterReset, GaugeType, UnknownCounterReset:
bytes := c.Bytes()
bytes[2] = (bytes[2] & 0b00111111) | byte(h)
default:
panic("invalid CounterResetHeader type")
}
}
// GetCounterResetHeader returns the info about the first 2 bits of the chunk header.
func (c *HistoChunk) GetCounterResetHeader() CounterResetHeader {
return CounterResetHeader(c.Bytes()[2] & 0b11000000)
}
// Compact implements the Chunk interface.
func (c *HistoChunk) Compact() {
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {
@ -249,42 +275,56 @@ func (a *HistoAppender) Append(int64, float64) {}
// * any buckets disappeared
// * there was a counter reset in the count of observations or in any bucket, including the zero bucket
// * the last sample in the chunk was stale while the current sample is not stale
// It returns an additional boolean set to true if it is not appendable because of a counter reset.
// If the given sample is stale, it will always return true.
func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection, []Interjection, bool) {
// If counterReset is true, okToAppend MUST be false.
func (a *HistoAppender) Appendable(h histogram.SparseHistogram) (posInterjections []Interjection, negInterjections []Interjection, okToAppend bool, counterReset bool) {
if value.IsStaleNaN(h.Sum) {
// This is a stale sample whose buckets and spans don't matter.
return nil, nil, true
okToAppend = true
return
}
if value.IsStaleNaN(a.sum) {
// If the last sample was stale, then we can only accept stale samples in this chunk.
return nil, nil, false
return
}
if h.Count < a.cnt {
// There has been a counter reset.
counterReset = true
return
}
if h.Schema != a.schema || h.ZeroThreshold != a.zeroThreshold {
return nil, nil, false
return
}
posInterjections, ok := compareSpans(a.posSpans, h.PositiveSpans)
if !ok {
return nil, nil, false
if h.ZeroCount < a.zcnt {
// There has been a counter reset since ZeroThreshold didn't change.
counterReset = true
return
}
negInterjections, ok := compareSpans(a.negSpans, h.NegativeSpans)
var ok bool
posInterjections, ok = compareSpans(a.posSpans, h.PositiveSpans)
if !ok {
return nil, nil, false
counterReset = true
return
}
if h.Count < a.cnt || h.ZeroCount < a.zcnt {
// There has been a counter reset.
return nil, nil, false
negInterjections, ok = compareSpans(a.negSpans, h.NegativeSpans)
if !ok {
counterReset = true
return
}
if counterResetInAnyBucket(a.posbuckets, h.PositiveBuckets, a.posSpans, h.PositiveSpans) {
return nil, nil, false
}
if counterResetInAnyBucket(a.negbuckets, h.NegativeBuckets, a.negSpans, h.NegativeSpans) {
return nil, nil, false
if counterResetInAnyBucket(a.posbuckets, h.PositiveBuckets, a.posSpans, h.PositiveSpans) ||
counterResetInAnyBucket(a.negbuckets, h.NegativeBuckets, a.negSpans, h.NegativeSpans) {
counterReset, posInterjections, negInterjections = true, nil, nil
return
}
return posInterjections, negInterjections, ok
okToAppend = true
return
}
// counterResetInAnyBucket returns true if there was a counter reset for any bucket.
@ -476,7 +516,8 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection
// it again with the new span layout. This can probably be done in-place
// by editing the chunk. But let's first see how expensive it is in the
// big picture.
it := newHistoIterator(a.b.bytes())
byts := a.b.bytes()
it := newHistoIterator(byts)
hc := NewHistoChunk()
app, err := hc.Appender()
if err != nil {
@ -504,6 +545,9 @@ func (a *HistoAppender) Recode(posInterjections, negInterjections []Interjection
}
app.AppendHistogram(tOld, hOld)
}
// Set the flags.
hc.SetCounterResetHeader(CounterResetHeader(byts[2] & 0b11000000))
return hc, app
}
@ -643,6 +687,7 @@ func (it *histoIterator) Next() bool {
if it.numRead == 0 {
// first read is responsible for reading chunk metadata and initializing fields that depend on it
// We give counter reset info at chunk level, hence we discard it here.
schema, zeroThreshold, posSpans, negSpans, err := readHistoChunkMeta(&it.br)
if err != nil {
it.err = err

@ -32,29 +32,34 @@ func putHistoChunkMetaSpans(b *bstream, spans []histogram.Span) {
}
}
func readHistoChunkMeta(b *bstreamReader) (int32, float64, []histogram.Span, []histogram.Span, error) {
func readHistoChunkMeta(b *bstreamReader) (schema int32, zeroThreshold float64, posSpans []histogram.Span, negSpans []histogram.Span, err error) {
_, err = b.ReadByte() // The header.
if err != nil {
return
}
v, err := readInt64VBBucket(b)
if err != nil {
return 0, 0, nil, nil, err
return
}
schema := int32(v)
schema = int32(v)
zeroThreshold, err := readFloat64VBBucket(b)
zeroThreshold, err = readFloat64VBBucket(b)
if err != nil {
return 0, 0, nil, nil, err
return
}
posSpans, err := readHistoChunkMetaSpans(b)
posSpans, err = readHistoChunkMetaSpans(b)
if err != nil {
return 0, 0, nil, nil, err
return
}
negSpans, err := readHistoChunkMetaSpans(b)
negSpans, err = readHistoChunkMetaSpans(b)
if err != nil {
return 0, 0, nil, nil, err
return
}
return schema, zeroThreshold, posSpans, negSpans, nil
return
}
func readHistoChunkMetaSpans(b *bstreamReader) ([]histogram.Span, error) {

@ -163,10 +163,11 @@ func TestHistoChunkBucketChanges(t *testing.T) {
// This is how span changes will be handled.
histoApp, _ := app.(*HistoAppender)
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2)
require.Greater(t, len(posInterjections), 0)
require.Equal(t, 0, len(negInterjections))
require.True(t, ok) // Only new buckets came in.
require.False(t, cr)
c, app = histoApp.Recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans)
app.AppendHistogram(ts2, h2)
@ -234,10 +235,11 @@ func TestHistoChunkAppendable(t *testing.T) {
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30)
histoApp, _ := app.(*HistoAppender)
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2)
require.Greater(t, len(posInterjections), 0)
require.Equal(t, 0, len(negInterjections))
require.True(t, ok) // Only new buckets came in.
require.False(t, cr)
}
{ // New histogram that has a bucket missing.
@ -252,10 +254,11 @@ func TestHistoChunkAppendable(t *testing.T) {
h2.PositiveBuckets = []int64{6, -3, -1, 2, 1, -4} // counts: 6, 3, 2, 4, 5, 1 (total 21)
histoApp, _ := app.(*HistoAppender)
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2)
require.Equal(t, 0, len(posInterjections))
require.Equal(t, 0, len(negInterjections))
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
}
{ // New histogram that has a counter reset while buckets are same.
@ -264,10 +267,11 @@ func TestHistoChunkAppendable(t *testing.T) {
h2.PositiveBuckets = []int64{6, -4, 1, -1, 2, 1, -4} // counts: 6, 2, 3, 2, 4, 5, 1 (total 23)
histoApp, _ := app.(*HistoAppender)
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2)
require.Equal(t, 0, len(posInterjections))
require.Equal(t, 0, len(negInterjections))
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
}
{ // New histogram that has a counter reset while new buckets were added.
@ -284,10 +288,11 @@ func TestHistoChunkAppendable(t *testing.T) {
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 0} // 7 5 1 3 1 0 2 5 5 0 0 (total 29)
histoApp, _ := app.(*HistoAppender)
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2)
require.Equal(t, 0, len(posInterjections))
require.Equal(t, 0, len(negInterjections))
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
}
{ // New histogram that has a counter reset while new buckets were added before the first bucket and reset on first bucket.
@ -307,9 +312,10 @@ func TestHistoChunkAppendable(t *testing.T) {
h2.PositiveBuckets = []int64{1, 1, 3, -2, 0, -1, 2, 1, -4} // counts: 1, 2, 5, 3, 3, 2, 4, 5, 1 (total 26)
histoApp, _ := app.(*HistoAppender)
posInterjections, negInterjections, ok := histoApp.Appendable(h2)
posInterjections, negInterjections, ok, cr := histoApp.Appendable(h2)
require.Equal(t, 0, len(posInterjections))
require.Equal(t, 0, len(negInterjections))
require.False(t, ok) // Need to cut a new chunk.
require.True(t, cr)
}
}

@ -605,20 +605,29 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
// appendHistogram adds the sparse histogram.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper chunk reference afterwards.
// We check for Appendable before appendPreprocessor because in case it ends up creating a new chunk,
// we need to know if there was also a counter reset or not to set the meta properly.
app, _ := s.app.(*chunkenc.HistoAppender)
var (
posInterjections, negInterjections []chunkenc.Interjection
okToAppend, counterReset bool
)
if app != nil {
posInterjections, negInterjections, okToAppend, counterReset = app.Appendable(sh)
}
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncSHS, chunkDiskMapper)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
if !chunkCreated {
// Head controls the execution of recoding, so that we own the proper chunk reference afterwards
app, _ := s.app.(*chunkenc.HistoAppender)
posInterjections, negInterjections, ok := app.Appendable(sh)
// we have 3 cases here
// !ok -> we need to cut a new chunk
// ok but we have interjections -> existing chunk needs recoding before we can append our histogram
// ok and no interjections -> chunk is ready to support our histogram
if !ok {
// We have 3 cases here
// !okToAppend -> we need to cut a new chunk
// okToAppend but we have interjections -> existing chunk needs recoding before we can append our histogram
// okToAppend and no interjections -> chunk is ready to support our histogram
if !okToAppend || counterReset {
c = s.cutNewHeadChunk(t, chunkenc.EncSHS, chunkDiskMapper)
chunkCreated = true
} else if len(posInterjections) > 0 || len(negInterjections) > 0 {
@ -633,6 +642,17 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen
}
}
if chunkCreated {
hc := s.headChunk.chunk.(*chunkenc.HistoChunk)
header := chunkenc.UnknownCounterReset
if counterReset {
header = chunkenc.CounterReset
} else if okToAppend {
header = chunkenc.NotCounterReset
}
hc.SetCounterResetHeader(header)
}
s.app.AppendHistogram(t, sh)
s.sparseHistogramSeries = true

@ -717,6 +717,9 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
h histogram.SparseHistogram
)
if p.currDelIter.ChunkEncoding() == chunkenc.EncSHS {
if hc, ok := p.currChkMeta.Chunk.(*chunkenc.HistoChunk); ok {
newChunk.(*chunkenc.HistoChunk).SetCounterResetHeader(hc.GetCounterResetHeader())
}
t, h = p.currDelIter.AtHistogram()
p.curr.MinTime = t
app.AppendHistogram(t, h.Copy())

Loading…
Cancel
Save