mirror of https://github.com/prometheus/prometheus
Merge pull request #14598 from krajorama/fix-compaction-panic
Fix: panic: runtime error: index out of range [4] with length 4pull/14621/head
commit
3a673cd0bc
|
@ -419,6 +419,7 @@ loop:
|
||||||
// fill in the bucket in b and advance a.
|
// fill in the bucket in b and advance a.
|
||||||
if aCount == 0 {
|
if aCount == 0 {
|
||||||
bInter.num++ // Mark that we need to insert a bucket in b.
|
bInter.num++ // Mark that we need to insert a bucket in b.
|
||||||
|
bInter.bucketIdx = aIdx
|
||||||
// Advance a
|
// Advance a
|
||||||
if aInter.num > 0 {
|
if aInter.num > 0 {
|
||||||
aInserts = append(aInserts, aInter)
|
aInserts = append(aInserts, aInter)
|
||||||
|
@ -436,6 +437,7 @@ loop:
|
||||||
return nil, nil, false
|
return nil, nil, false
|
||||||
case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare.
|
case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare.
|
||||||
aInter.num++
|
aInter.num++
|
||||||
|
bInter.bucketIdx = bIdx
|
||||||
// Advance b
|
// Advance b
|
||||||
if bInter.num > 0 {
|
if bInter.num > 0 {
|
||||||
bInserts = append(bInserts, bInter)
|
bInserts = append(bInserts, bInter)
|
||||||
|
@ -453,6 +455,7 @@ loop:
|
||||||
// fill in the bucket in b and advance a.
|
// fill in the bucket in b and advance a.
|
||||||
if aCount == 0 {
|
if aCount == 0 {
|
||||||
bInter.num++
|
bInter.num++
|
||||||
|
bInter.bucketIdx = aIdx
|
||||||
// Advance a
|
// Advance a
|
||||||
if aInter.num > 0 {
|
if aInter.num > 0 {
|
||||||
aInserts = append(aInserts, aInter)
|
aInserts = append(aInserts, aInter)
|
||||||
|
@ -471,6 +474,7 @@ loop:
|
||||||
return nil, nil, false
|
return nil, nil, false
|
||||||
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
|
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
|
||||||
aInter.num++
|
aInter.num++
|
||||||
|
bInter.bucketIdx = bIdx
|
||||||
// Advance b
|
// Advance b
|
||||||
if bInter.num > 0 {
|
if bInter.num > 0 {
|
||||||
bInserts = append(bInserts, bInter)
|
bInserts = append(bInserts, bInter)
|
||||||
|
@ -773,6 +777,23 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend
|
||||||
happ.appendFloatHistogram(t, h)
|
happ.appendFloatHistogram(t, h)
|
||||||
return newChunk, false, app, nil
|
return newChunk, false, app, nil
|
||||||
}
|
}
|
||||||
|
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
|
||||||
|
// The histogram needs to be expanded to have the extra empty buckets
|
||||||
|
// of the chunk.
|
||||||
|
if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 {
|
||||||
|
// No new chunks from the histogram, so the spans of the appender can accommodate the new buckets.
|
||||||
|
// However we need to make a copy in case the input is sharing spans from an iterator.
|
||||||
|
h.PositiveSpans = make([]histogram.Span, len(a.pSpans))
|
||||||
|
copy(h.PositiveSpans, a.pSpans)
|
||||||
|
h.NegativeSpans = make([]histogram.Span, len(a.nSpans))
|
||||||
|
copy(h.NegativeSpans, a.nSpans)
|
||||||
|
} else {
|
||||||
|
// Spans need pre-adjusting to accommodate the new buckets.
|
||||||
|
h.PositiveSpans = adjustForInserts(h.PositiveSpans, pBackwardInserts)
|
||||||
|
h.NegativeSpans = adjustForInserts(h.NegativeSpans, nBackwardInserts)
|
||||||
|
}
|
||||||
|
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
|
||||||
|
}
|
||||||
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
|
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
|
||||||
if appendOnly {
|
if appendOnly {
|
||||||
return nil, false, a, fmt.Errorf("float histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
|
return nil, false, a, fmt.Errorf("float histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
|
||||||
|
@ -784,13 +805,6 @@ func (a *FloatHistogramAppender) AppendFloatHistogram(prev *FloatHistogramAppend
|
||||||
app.(*FloatHistogramAppender).appendFloatHistogram(t, h)
|
app.(*FloatHistogramAppender).appendFloatHistogram(t, h)
|
||||||
return chk, true, app, nil
|
return chk, true, app, nil
|
||||||
}
|
}
|
||||||
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
|
|
||||||
// The histogram needs to be expanded to have the extra empty buckets
|
|
||||||
// of the chunk.
|
|
||||||
h.PositiveSpans = a.pSpans
|
|
||||||
h.NegativeSpans = a.nSpans
|
|
||||||
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
|
|
||||||
}
|
|
||||||
a.appendFloatHistogram(t, h)
|
a.appendFloatHistogram(t, h)
|
||||||
return nil, false, a, nil
|
return nil, false, a, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -411,6 +411,7 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
|
||||||
{Offset: 3, Length: 2},
|
{Offset: 3, Length: 2},
|
||||||
{Offset: 5, Length: 1},
|
{Offset: 5, Length: 1},
|
||||||
}
|
}
|
||||||
|
savedH2Spans := h2.PositiveSpans
|
||||||
h2.PositiveBuckets = []float64{7, 4, 3, 5, 2}
|
h2.PositiveBuckets = []float64{7, 4, 3, 5, 2}
|
||||||
|
|
||||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||||
|
@ -426,6 +427,43 @@ func TestFloatHistogramChunkAppendable(t *testing.T) {
|
||||||
// Check that h2 was recoded.
|
// Check that h2 was recoded.
|
||||||
require.Equal(t, []float64{7, 0, 4, 3, 5, 0, 2}, h2.PositiveBuckets)
|
require.Equal(t, []float64{7, 0, 4, 3, 5, 0, 2}, h2.PositiveBuckets)
|
||||||
require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans)
|
require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans)
|
||||||
|
require.NotEqual(t, savedH2Spans, h2.PositiveSpans, "recoding must make a copy")
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // New histogram that has new buckets AND buckets missing but the buckets missing were empty.
|
||||||
|
emptyBucketH := eh.Copy()
|
||||||
|
emptyBucketH.PositiveBuckets = []float64{6, 0, 3, 2, 4, 0, 1}
|
||||||
|
c, hApp, ts, h1 := setup(emptyBucketH)
|
||||||
|
h2 := h1.Copy()
|
||||||
|
h2.PositiveSpans = []histogram.Span{
|
||||||
|
{Offset: 0, Length: 1},
|
||||||
|
{Offset: 3, Length: 1},
|
||||||
|
{Offset: 3, Length: 2},
|
||||||
|
{Offset: 5, Length: 2},
|
||||||
|
}
|
||||||
|
savedH2Spans := h2.PositiveSpans
|
||||||
|
h2.PositiveBuckets = []float64{7, 4, 3, 5, 2, 3}
|
||||||
|
|
||||||
|
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||||
|
require.NotEmpty(t, posInterjections)
|
||||||
|
require.Empty(t, negInterjections)
|
||||||
|
require.NotEmpty(t, backwardPositiveInserts)
|
||||||
|
require.Empty(t, backwardNegativeInserts)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.False(t, cr)
|
||||||
|
|
||||||
|
assertRecodedFloatHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
|
||||||
|
|
||||||
|
// Check that h2 was recoded.
|
||||||
|
require.Equal(t, []float64{7, 0, 4, 3, 5, 0, 2, 3}, h2.PositiveBuckets)
|
||||||
|
require.Equal(t, []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2}, // Added empty bucket.
|
||||||
|
{Offset: 2, Length: 1}, // Existing - offset adjusted.
|
||||||
|
{Offset: 3, Length: 2}, // Existing.
|
||||||
|
{Offset: 3, Length: 1}, // Added empty bucket.
|
||||||
|
{Offset: 1, Length: 2}, // Existing + the extra bucket.
|
||||||
|
}, h2.PositiveSpans)
|
||||||
|
require.NotEqual(t, savedH2Spans, h2.PositiveSpans, "recoding must make a copy")
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // New histogram that has a counter reset while buckets are same.
|
{ // New histogram that has a counter reset while buckets are same.
|
||||||
|
|
|
@ -437,6 +437,7 @@ loop:
|
||||||
// fill in the bucket in b and advance a.
|
// fill in the bucket in b and advance a.
|
||||||
if aCount == 0 {
|
if aCount == 0 {
|
||||||
bInter.num++ // Mark that we need to insert a bucket in b.
|
bInter.num++ // Mark that we need to insert a bucket in b.
|
||||||
|
bInter.bucketIdx = aIdx
|
||||||
// Advance a
|
// Advance a
|
||||||
if aInter.num > 0 {
|
if aInter.num > 0 {
|
||||||
aInserts = append(aInserts, aInter)
|
aInserts = append(aInserts, aInter)
|
||||||
|
@ -454,6 +455,7 @@ loop:
|
||||||
return nil, nil, false
|
return nil, nil, false
|
||||||
case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare.
|
case aIdx > bIdx: // a misses a value that is in b. Forward b and recompare.
|
||||||
aInter.num++
|
aInter.num++
|
||||||
|
aInter.bucketIdx = bIdx
|
||||||
// Advance b
|
// Advance b
|
||||||
if bInter.num > 0 {
|
if bInter.num > 0 {
|
||||||
bInserts = append(bInserts, bInter)
|
bInserts = append(bInserts, bInter)
|
||||||
|
@ -471,6 +473,7 @@ loop:
|
||||||
// fill in the bucket in b and advance a.
|
// fill in the bucket in b and advance a.
|
||||||
if aCount == 0 {
|
if aCount == 0 {
|
||||||
bInter.num++
|
bInter.num++
|
||||||
|
bInter.bucketIdx = aIdx
|
||||||
// Advance a
|
// Advance a
|
||||||
if aInter.num > 0 {
|
if aInter.num > 0 {
|
||||||
aInserts = append(aInserts, aInter)
|
aInserts = append(aInserts, aInter)
|
||||||
|
@ -489,6 +492,7 @@ loop:
|
||||||
return nil, nil, false
|
return nil, nil, false
|
||||||
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
|
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
|
||||||
aInter.num++
|
aInter.num++
|
||||||
|
aInter.bucketIdx = bIdx
|
||||||
// Advance b
|
// Advance b
|
||||||
if bInter.num > 0 {
|
if bInter.num > 0 {
|
||||||
bInserts = append(bInserts, bInter)
|
bInserts = append(bInserts, bInter)
|
||||||
|
@ -807,6 +811,23 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h
|
||||||
happ.appendHistogram(t, h)
|
happ.appendHistogram(t, h)
|
||||||
return newChunk, false, app, nil
|
return newChunk, false, app, nil
|
||||||
}
|
}
|
||||||
|
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
|
||||||
|
// The histogram needs to be expanded to have the extra empty buckets
|
||||||
|
// of the chunk.
|
||||||
|
if len(pForwardInserts) == 0 && len(nForwardInserts) == 0 {
|
||||||
|
// No new chunks from the histogram, so the spans of the appender can accommodate the new buckets.
|
||||||
|
// However we need to make a copy in case the input is sharing spans from an iterator.
|
||||||
|
h.PositiveSpans = make([]histogram.Span, len(a.pSpans))
|
||||||
|
copy(h.PositiveSpans, a.pSpans)
|
||||||
|
h.NegativeSpans = make([]histogram.Span, len(a.nSpans))
|
||||||
|
copy(h.NegativeSpans, a.nSpans)
|
||||||
|
} else {
|
||||||
|
// Spans need pre-adjusting to accommodate the new buckets.
|
||||||
|
h.PositiveSpans = adjustForInserts(h.PositiveSpans, pBackwardInserts)
|
||||||
|
h.NegativeSpans = adjustForInserts(h.NegativeSpans, nBackwardInserts)
|
||||||
|
}
|
||||||
|
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
|
||||||
|
}
|
||||||
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
|
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
|
||||||
if appendOnly {
|
if appendOnly {
|
||||||
return nil, false, a, fmt.Errorf("histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
|
return nil, false, a, fmt.Errorf("histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts))
|
||||||
|
@ -818,13 +839,6 @@ func (a *HistogramAppender) AppendHistogram(prev *HistogramAppender, t int64, h
|
||||||
app.(*HistogramAppender).appendHistogram(t, h)
|
app.(*HistogramAppender).appendHistogram(t, h)
|
||||||
return chk, true, app, nil
|
return chk, true, app, nil
|
||||||
}
|
}
|
||||||
if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 {
|
|
||||||
// The histogram needs to be expanded to have the extra empty buckets
|
|
||||||
// of the chunk.
|
|
||||||
h.PositiveSpans = a.pSpans
|
|
||||||
h.NegativeSpans = a.nSpans
|
|
||||||
a.recodeHistogram(h, pBackwardInserts, nBackwardInserts)
|
|
||||||
}
|
|
||||||
a.appendHistogram(t, h)
|
a.appendHistogram(t, h)
|
||||||
return nil, false, a, nil
|
return nil, false, a, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -278,6 +278,10 @@ func (b *bucketIterator) Next() (int, bool) {
|
||||||
type Insert struct {
|
type Insert struct {
|
||||||
pos int
|
pos int
|
||||||
num int
|
num int
|
||||||
|
|
||||||
|
// Optional: bucketIdx is the index of the bucket that is inserted.
|
||||||
|
// Can be used to adjust spans.
|
||||||
|
bucketIdx int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deprecated: expandSpansForward, use expandIntSpansAndBuckets or
|
// Deprecated: expandSpansForward, use expandIntSpansAndBuckets or
|
||||||
|
@ -577,3 +581,65 @@ func counterResetHint(crh CounterResetHeader, numRead uint16) histogram.CounterR
|
||||||
return histogram.UnknownCounterReset
|
return histogram.UnknownCounterReset
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// adjustForInserts adjusts the spans for the given inserts.
|
||||||
|
func adjustForInserts(spans []histogram.Span, inserts []Insert) (mergedSpans []histogram.Span) {
|
||||||
|
if len(inserts) == 0 {
|
||||||
|
return spans
|
||||||
|
}
|
||||||
|
|
||||||
|
it := newBucketIterator(spans)
|
||||||
|
|
||||||
|
var (
|
||||||
|
lastBucket int
|
||||||
|
i int
|
||||||
|
insertIdx = inserts[i].bucketIdx
|
||||||
|
insertNum = inserts[i].num
|
||||||
|
)
|
||||||
|
|
||||||
|
addBucket := func(b int) {
|
||||||
|
offset := b - lastBucket - 1
|
||||||
|
if offset == 0 && len(mergedSpans) > 0 {
|
||||||
|
mergedSpans[len(mergedSpans)-1].Length++
|
||||||
|
} else {
|
||||||
|
if len(mergedSpans) == 0 {
|
||||||
|
offset++
|
||||||
|
}
|
||||||
|
mergedSpans = append(mergedSpans, histogram.Span{
|
||||||
|
Offset: int32(offset),
|
||||||
|
Length: 1,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
lastBucket = b
|
||||||
|
}
|
||||||
|
consumeInsert := func() {
|
||||||
|
// Consume the insert.
|
||||||
|
insertNum--
|
||||||
|
if insertNum == 0 {
|
||||||
|
i++
|
||||||
|
if i < len(inserts) {
|
||||||
|
insertIdx = inserts[i].bucketIdx
|
||||||
|
insertNum = inserts[i].num
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
insertIdx++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bucket, ok := it.Next()
|
||||||
|
for ok {
|
||||||
|
if i < len(inserts) && insertIdx < bucket {
|
||||||
|
addBucket(insertIdx)
|
||||||
|
consumeInsert()
|
||||||
|
} else {
|
||||||
|
addBucket(bucket)
|
||||||
|
bucket, ok = it.Next()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for i < len(inserts) {
|
||||||
|
addBucket(inserts[i].bucketIdx)
|
||||||
|
consumeInsert()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
|
@ -428,6 +428,7 @@ func TestHistogramChunkAppendable(t *testing.T) {
|
||||||
{Offset: 4, Length: 1},
|
{Offset: 4, Length: 1},
|
||||||
{Offset: 1, Length: 1},
|
{Offset: 1, Length: 1},
|
||||||
}
|
}
|
||||||
|
savedH2Spans := h2.PositiveSpans
|
||||||
h2.PositiveBuckets = []int64{7, -5, 1, 0, 1} // counts: 7, 2, 3, 3, 4 (total 18)
|
h2.PositiveBuckets = []int64{7, -5, 1, 0, 1} // counts: 7, 2, 3, 3, 4 (total 18)
|
||||||
|
|
||||||
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||||
|
@ -443,6 +444,44 @@ func TestHistogramChunkAppendable(t *testing.T) {
|
||||||
// Check that h2 was recoded.
|
// Check that h2 was recoded.
|
||||||
require.Equal(t, []int64{7, -7, 2, 1, -3, 3, 1}, h2.PositiveBuckets) // counts: 7, 0, 2, 3 , 0, 3, 4 (total 18)
|
require.Equal(t, []int64{7, -7, 2, 1, -3, 3, 1}, h2.PositiveBuckets) // counts: 7, 0, 2, 3 , 0, 3, 4 (total 18)
|
||||||
require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans)
|
require.Equal(t, emptyBucketH.PositiveSpans, h2.PositiveSpans)
|
||||||
|
require.NotEqual(t, savedH2Spans, h2.PositiveSpans, "recoding must make a copy")
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // New histogram that has new buckets AND buckets missing but the buckets missing were empty.
|
||||||
|
emptyBucketH := eh.Copy()
|
||||||
|
emptyBucketH.PositiveBuckets = []int64{6, -6, 1, 1, -2, 1, 1} // counts: 6, 0, 1, 2, 0, 1, 2 (total 12)
|
||||||
|
c, hApp, ts, h1 := setup(emptyBucketH)
|
||||||
|
h2 := h1.Copy()
|
||||||
|
h2.PositiveSpans = []histogram.Span{ // Missing buckets at offset 1 and 9.
|
||||||
|
{Offset: 0, Length: 1},
|
||||||
|
{Offset: 3, Length: 1},
|
||||||
|
{Offset: 3, Length: 1},
|
||||||
|
{Offset: 4, Length: 1},
|
||||||
|
{Offset: 1, Length: 2},
|
||||||
|
}
|
||||||
|
savedH2Spans := h2.PositiveSpans
|
||||||
|
h2.PositiveBuckets = []int64{7, -5, 1, 0, 1, 1} // counts: 7, 2, 3, 3, 4, 5 (total 23)
|
||||||
|
|
||||||
|
posInterjections, negInterjections, backwardPositiveInserts, backwardNegativeInserts, ok, cr := hApp.appendable(h2)
|
||||||
|
require.NotEmpty(t, posInterjections)
|
||||||
|
require.Empty(t, negInterjections)
|
||||||
|
require.NotEmpty(t, backwardPositiveInserts)
|
||||||
|
require.Empty(t, backwardNegativeInserts)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.False(t, cr)
|
||||||
|
|
||||||
|
assertRecodedHistogramChunkOnAppend(t, c, hApp, ts+1, h2, UnknownCounterReset)
|
||||||
|
|
||||||
|
// Check that h2 was recoded.
|
||||||
|
require.Equal(t, []int64{7, -7, 2, 1, -3, 3, 1, 1}, h2.PositiveBuckets) // counts: 7, 0, 2, 3 , 0, 3, 5 (total 23)
|
||||||
|
require.Equal(t, []histogram.Span{
|
||||||
|
{Offset: 0, Length: 2}, // Added empty bucket.
|
||||||
|
{Offset: 2, Length: 1}, // Existing - offset adjusted.
|
||||||
|
{Offset: 3, Length: 2}, // Added empty bucket.
|
||||||
|
{Offset: 3, Length: 1}, // Existing - offset adjusted.
|
||||||
|
{Offset: 1, Length: 2}, // Existing.
|
||||||
|
}, h2.PositiveSpans)
|
||||||
|
require.NotEqual(t, savedH2Spans, h2.PositiveSpans, "recoding must make a copy")
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // New histogram that has a counter reset while buckets are same.
|
{ // New histogram that has a counter reset while buckets are same.
|
||||||
|
|
Loading…
Reference in New Issue