diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index 67bdf1467..3c4d4aab6 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -70,8 +70,6 @@ const () // observation 2 delta delta delta xor []delta []delta // observation >2 dod dod dod xor []dod []dod // TODO zerothreshold -// TODO: encode schema and spans metadata in the chunk -// TODO: decode-recode chunk when new spans appear type HistoChunk struct { b bstream } @@ -165,8 +163,20 @@ func countSpans(spans []histogram.Span) int { return cnt } +func newHistoIterator(b []byte) *histoIterator { + it := &histoIterator{ + br: newBReader(b), + numTotal: binary.BigEndian.Uint16(b), + t: math.MinInt64, + } + // The first 2 bytes contain chunk headers. + // We skip that for actual samples. + _, _ = it.br.readBits(16) + return it +} + func (c *HistoChunk) iterator(it Iterator) *histoIterator { - // TODO fix this. this is taken from xor.go + // TODO fix this. this is taken from xor.go // dieter not sure what the purpose of this is // Should iterators guarantee to act on a copy of the data so it doesn't lock append? // When using striped locks to guard access to chunks, probably yes. // Could only copy data if the chunk is not completed yet. @@ -174,14 +184,7 @@ func (c *HistoChunk) iterator(it Iterator) *histoIterator { // histoIter.Reset(c.b.bytes()) // return histoIter //} - - return &histoIterator{ - // The first 2 bytes contain chunk headers. - // We skip that for actual samples. - br: newBReader(c.b.bytes()[2:]), - numTotal: binary.BigEndian.Uint16(c.b.bytes()), - t: math.MinInt64, - } + return newHistoIterator(c.b.bytes()) } // Iterator implements the Chunk interface. @@ -264,6 +267,20 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { putVarint(a.b, a.buf64, buck) } case 1: + // TODO if zerobucket thresh or schema is different, we should create a new chunk + posInterjections, _ := compareSpans(a.posSpans, h.PositiveSpans) + //if !ok { + // TODO Ganesh this is when we know buckets have dis-appeared and we should create a new chunk instead + //} + negInterjections, _ := compareSpans(a.negSpans, h.NegativeSpans) + //if !ok { + // TODO Ganesh this is when we know buckets have dis-appeared and we should create a new chunk instead + //} + if len(posInterjections) > 0 || len(negInterjections) > 0 { + // new buckets have appeared. we need to recode all prior histograms within the chunk before we can process this one. + a.recode(posInterjections, negInterjections, h.PositiveSpans, h.NegativeSpans) + } + tDelta = t - a.t cntDelta = int64(h.Count) - int64(a.cnt) zcntDelta = int64(h.ZeroCount) - int64(a.zcnt) @@ -285,6 +302,19 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { a.negbucketsDelta[i] = delta } default: + // TODO if zerobucket thresh or schema is different, we should create a new chunk + posInterjections, _ := compareSpans(a.posSpans, h.PositiveSpans) + //if !ok { + // TODO Ganesh this is when we know buckets have dis-appeared and we should create a new chunk instead + //} + negInterjections, _ := compareSpans(a.negSpans, h.NegativeSpans) + //if !ok { + // TODO Ganesh this is when we know buckets have dis-appeared and we should create a new chunk instead + //} + if len(posInterjections) > 0 || len(negInterjections) > 0 { + // new buckets have appeared. we need to recode all prior histograms within the chunk before we can process this one. + a.recode(posInterjections, negInterjections, h.PositiveSpans, h.NegativeSpans) + } tDelta = t - a.t cntDelta = int64(h.Count) - int64(a.cnt) zcntDelta = int64(h.ZeroCount) - int64(a.zcnt) @@ -329,6 +359,43 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { } +// recode converts the current chunk to accommodate an expansion of the set of +// (positive and/or negative) buckets used, according to the provided interjections, resulting in +// the honoring of the provided new posSpans and negSpans +// note: the decode-recode can probably be done more efficiently, but that's for a future optimization +func (a *histoAppender) recode(posInterjections, negInterjections []interjection, posSpans, negSpans []histogram.Span) { + it := newHistoIterator(a.b.bytes()) + app, err := NewHistoChunk().Appender() + if err != nil { + panic(err) + } + numPosBuckets, numNegBuckets := countSpans(posSpans), countSpans(negSpans) + posbuckets := make([]int64, numPosBuckets) // new (modified) histogram buckets + negbuckets := make([]int64, numNegBuckets) // new (modified) histogram buckets + + for it.Next() { + tOld, hOld := it.AtHistogram() + // save the modified histogram to the new chunk + hOld.PositiveSpans, hOld.NegativeSpans = posSpans, negSpans + if len(posInterjections) > 0 { + hOld.PositiveBuckets = interject(hOld.PositiveBuckets, posbuckets, posInterjections) + } + if len(negInterjections) > 0 { + hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negbuckets, negInterjections) + } + // there is no risk of infinite recursion here as all histograms get appended with the same schema (number of buckets) + app.AppendHistogram(tOld, hOld) + } + + // adopt the new appender into ourselves + // we skip porting some fields like schema, t, cnt and zcnt, sum because they didn't change between our old chunk and the recoded one + app2 := app.(*histoAppender) + a.b = app2.b + a.posSpans, a.negSpans = posSpans, negSpans + a.posbuckets, a.negbuckets = app2.posbuckets, app2.negbuckets + a.posbucketsDelta, a.negbucketsDelta = app2.posbucketsDelta, app2.negbucketsDelta +} + func (a *histoAppender) writeSumDelta(v float64) { vDelta := math.Float64bits(v) ^ math.Float64bits(a.sum) diff --git a/tsdb/chunkenc/histo_meta.go b/tsdb/chunkenc/histo_meta.go index 0b5877448..ac1d203ad 100644 --- a/tsdb/chunkenc/histo_meta.go +++ b/tsdb/chunkenc/histo_meta.go @@ -80,3 +80,161 @@ func readHistoChunkMetaSpans(b *bstreamReader) ([]histogram.Span, error) { } return spans, nil } + +type bucketIterator struct { + spans []histogram.Span + span int // span position of last yielded bucket + bucket int // bucket position within span of last yielded bucket + idx int // bucket index (globally across all spans) of last yielded bucket +} + +func newBucketIterator(spans []histogram.Span) *bucketIterator { + b := bucketIterator{ + spans: spans, + span: 0, + bucket: -1, + idx: -1, + } + if len(spans) > 0 { + b.idx += int(spans[0].Offset) + } + return &b +} + +func (b *bucketIterator) Next() (int, bool) { + // we're already out of bounds + if b.span >= len(b.spans) { + return 0, false + } +try: + if b.bucket < int(b.spans[b.span].Length-1) { // try to move within same span. + b.bucket++ + b.idx++ + return b.idx, true + } else if b.span < len(b.spans)-1 { // try to move from one span to the next + b.span++ + b.idx += int(b.spans[b.span].Offset + 1) + b.bucket = 0 + if b.spans[b.span].Length == 0 { + // pathological case that should never happen. We can't use this span, let's try again. + goto try + } + return b.idx, true + } + // we're out of options + return 0, false +} + +// interjection describes that num new buckets are introduced before processing the pos'th delta from the original slice +type interjection struct { + pos int + num int +} + +// compareSpans returns the interjections to convert a slice of deltas to a new slice representing an expanded set of buckets, or false if incompatible (e.g. if buckets were removed) +// For example: +// Let's say the old buckets look like this: +// span syntax: [offset, length] +// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1] +// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15] +// raw values 6 3 3 2 4 5 1 +// deltas 6 -3 0 -1 2 1 -4 + +// But now we introduce a new bucket layout. (carefully chosen example where we have a span appended, one unchanged[*], one prepended, and two merge - in that order) +// [*] unchanged in terms of which bucket indices they represent. but to achieve that, their offset needs to change if "disrupted" by spans changing ahead of them +// \/ this one is "unchanged" +// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ] +// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15] +// raw values 6 3 0 3 0 0 2 4 5 0 1 +// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1 +// delta mods: / \ / \ / \ +// note that whenever any new buckets are introduced, the subsequent "old" bucket needs to readjust its delta to the new base of 0 +// thus, for the caller, who wants to transform the set of original deltas to a new set of deltas to match a new span layout that adds buckets, we simply +// need to generate a list of interjections +// note: within compareSpans we don't have to worry about the changes to the spans themselves, +// thanks to the iterators, we get to work with the more useful bucket indices (which of course directly correspond to the buckets we have to adjust) +func compareSpans(a, b []histogram.Span) ([]interjection, bool) { + ai := newBucketIterator(a) + bi := newBucketIterator(b) + + var interjections []interjection + + // when inter.num becomes > 0, this becomes a valid interjection that should be yielded when we finish a streak of new buckets + var inter interjection + + av, aok := ai.Next() + bv, bok := bi.Next() + for { + if aok && bok { + if av == bv { // both have an identical value. move on! + // finish WIP interjection and reset + if inter.num > 0 { + interjections = append(interjections, inter) + } + inter.num = 0 + av, aok = ai.Next() + bv, bok = bi.Next() + if aok { + inter.pos++ + } + continue + } + if av < bv { // b misses a value that is in a. + return interjections, false + } + if av > bv { // a misses a value that is in b. forward b and recompare + inter.num++ + bv, bok = bi.Next() + continue + } + } else if aok && !bok { // b misses a value that is in a. + return interjections, false + } else if !aok && bok { // a misses a value that is in b. forward b and recompare + inter.num++ + bv, bok = bi.Next() + continue + } else { // both iterators ran out. we're done + if inter.num > 0 { + interjections = append(interjections, inter) + } + break + } + } + + return interjections, true +} + +// caller is responsible for making sure len(in) and len(out) are appropriate for the provided interjections! +func interject(in, out []int64, interjections []interjection) []int64 { + var j int // position in out + var v int64 // the last value seen + var interj int // the next interjection to process + for i, d := range in { + if interj < len(interjections) && i == interjections[interj].pos { + + // we have an interjection! + // add interjection.num new delta values such as their bucket values equate 0 + out[j] = int64(-v) + j++ + for x := 1; x < interjections[interj].num; x++ { + out[j] = 0 + j++ + } + interj++ + + // now save the value from the input. the delta value we should save is + // the original delta value + the last value of the point before the interjection (to undo the delta that was introduced by the interjection) + out[j] = d + v + j++ + v = d + v + continue + } + + // if there was no interjection, the original delta is still valid + out[j] = d + j++ + v += d + } + + return out +} diff --git a/tsdb/chunkenc/histo_meta_test.go b/tsdb/chunkenc/histo_meta_test.go new file mode 100644 index 000000000..ae5b202d9 --- /dev/null +++ b/tsdb/chunkenc/histo_meta_test.go @@ -0,0 +1,159 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The code in this file was largely written by Damian Gryski as part of +// https://github.com/dgryski/go-tsz and published under the license below. +// It was modified to accommodate reading from byte slices without modifying +// the underlying bytes, which would panic when reading from mmap'd +// read-only byte slices. +package chunkenc + +import ( + "testing" + + "github.com/prometheus/prometheus/pkg/histogram" + "github.com/stretchr/testify/require" +) + +// example of a span layout and resulting bucket indices (_idx_ is used in this histogram, others are shown just for context) +// spans : [offset: 0, length: 2] [offset 1, length 1] +// bucket idx : _0_ _1_ 2 [3] 4 ... + +func TestBucketIterator(t *testing.T) { + type test struct { + spans []histogram.Span + idxs []int + } + tests := []test{ + { + spans: []histogram.Span{ + { + Offset: 0, + Length: 1, + }, + }, + idxs: []int{0}, + }, + { + spans: []histogram.Span{ + { + Offset: 0, + Length: 2, + }, + { + Offset: 1, + Length: 1, + }, + }, + idxs: []int{0, 1, 3}, + }, + { + spans: []histogram.Span{ + { + Offset: 100, + Length: 4, + }, + { + Offset: 8, + Length: 7, + }, + { + Offset: 0, + Length: 1, + }, + }, + idxs: []int{100, 101, 102, 103, 112, 113, 114, 115, 116, 117, 118, 119}, + }, + // the below 2 sets ore the ones described in compareSpans's comments + { + spans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + }, + idxs: []int{0, 1, 4, 8, 9, 13, 15}, + }, + { + spans: []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + }, + idxs: []int{0, 1, 2, 4, 6, 7, 8, 9, 13, 14, 15}, + }, + } + for _, test := range tests { + b := newBucketIterator(test.spans) + var got []int + v, ok := b.Next() + for ok { + got = append(got, v) + v, ok = b.Next() + } + require.Equal(t, test.idxs, got) + } +} + +func TestInterjection(t *testing.T) { + // this tests the scenario as described in compareSpans's comments + a := []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + } + b := []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + } + interj := []interjection{ + { + pos: 2, + num: 1, + }, + { + pos: 3, + num: 2, + }, + { + pos: 6, + num: 1, + }, + } + testCompareSpans(a, b, interj, t) + testInterject(interj, t) +} + +func testCompareSpans(a, b []histogram.Span, exp []interjection, t *testing.T) { + got, ok := compareSpans(a, b) + require.Equal(t, true, ok) + require.Equal(t, exp, got) +} + +func testInterject(interjections []interjection, t *testing.T) { + // this tests the scenario as described in compareSpans's comments + // original deltas that represent these counts : 6, 3, 3, 2, 4, 5, 1 + a := []int64{6, -3, 0, -1, 2, 1, -4} + // modified deltas to represent the interjected counts: 6, 3, 0, 3, 0, 0, 2, 4, 5, 0, 1 + exp := []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1} + b := make([]int64, len(a)+4) + interject(a, b, interjections) + require.Equal(t, exp, b) + +} diff --git a/tsdb/chunkenc/histo_test.go b/tsdb/chunkenc/histo_test.go index 36d429a11..42af9aba8 100644 --- a/tsdb/chunkenc/histo_test.go +++ b/tsdb/chunkenc/histo_test.go @@ -33,7 +33,7 @@ func TestHistoChunkSameBuckets(t *testing.T) { app, err := c.Appender() require.NoError(t, err) - require.Equal(t, c.NumSamples(), 0) + require.Equal(t, 0, c.NumSamples()) ts := int64(1234567890) @@ -53,7 +53,7 @@ func TestHistoChunkSameBuckets(t *testing.T) { } app.AppendHistogram(ts, h) - require.Equal(t, c.NumSamples(), 1) + require.Equal(t, 1, c.NumSamples()) exp := []res{ {t: ts, h: h}, @@ -70,13 +70,13 @@ func TestHistoChunkSameBuckets(t *testing.T) { app.AppendHistogram(ts, h) exp = append(exp, res{t: ts, h: h}) - require.Equal(t, c.NumSamples(), 2) + require.Equal(t, 2, c.NumSamples()) // add update with new appender app, err = c.Appender() require.NoError(t, err) - require.Equal(t, c.NumSamples(), 2) + require.Equal(t, 2, c.NumSamples()) ts += 14 h.Count += 13 @@ -87,7 +87,7 @@ func TestHistoChunkSameBuckets(t *testing.T) { app.AppendHistogram(ts, h) exp = append(exp, res{t: ts, h: h}) - require.Equal(t, c.NumSamples(), 3) + require.Equal(t, 3, c.NumSamples()) // 1. Expand iterator in simple case. it1 := c.iterator(nil) @@ -130,3 +130,85 @@ func TestHistoChunkSameBuckets(t *testing.T) { // require.Equal(t, exp[mid:], res3) // require.Equal(t, false, it3.Seek(exp[len(exp)-1].t+1)) } + +// mimics the scenario described for compareSpans() +func TestHistoChunkBucketChanges(t *testing.T) { + + c := NewHistoChunk() + + type res struct { + t int64 + h histogram.SparseHistogram + } + + // create fresh appender and add the first histogram + + app, err := c.Appender() + require.NoError(t, err) + require.Equal(t, 0, c.NumSamples()) + + ts1 := int64(1234567890) + + h1 := histogram.SparseHistogram{ + Count: 5, + ZeroCount: 2, + Sum: 18.4, + //ZeroThreshold: 1, TODO + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + }, + PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24) + NegativeSpans: nil, + NegativeBuckets: []int64{}, + } + + app.AppendHistogram(ts1, h1) + require.Equal(t, 1, c.NumSamples()) + + // add an new histogram that has expanded buckets + + ts2 := ts1 + 16 + h2 := h1 + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + } + h2.Count += 9 + h2.ZeroCount++ + h2.Sum = 30 + // existing histogram should get values converted from the above to: 6 3 0 3 0 0 2 4 5 0 1 (previous values with some new empty buckets in between) + // so the new histogram should have new counts >= these per-bucket counts, e.g.: + h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30) + + app.AppendHistogram(ts2, h2) + + // TODO is this okay? + // the appender can rewrite its own bytes slice but it is not able to update the HistoChunk, so our histochunk is outdated until we update it manually + c.b = *(app.(*histoAppender).b) + require.Equal(t, 2, c.NumSamples()) + + // because the 2nd histogram has expanded buckets, we should expect all histograms (in particular the first) + // to come back using the new spans metadata as well as the expanded buckets + h1.PositiveSpans = h2.PositiveSpans + h1.PositiveBuckets = []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1} + exp := []res{ + {t: ts1, h: h1}, + {t: ts2, h: h2}, + } + it1 := c.iterator(nil) + require.NoError(t, it1.Err()) + var res1 []res + for it1.Next() { + ts, h := it1.AtHistogram() + res1 = append(res1, res{t: ts, h: h.Copy()}) + } + require.NoError(t, it1.Err()) + require.Equal(t, exp, res1) +}