diff --git a/storage/local/gorilla.go b/storage/local/gorilla.go index b6fb713ed..4509f6250 100644 --- a/storage/local/gorilla.go +++ b/storage/local/gorilla.go @@ -745,10 +745,22 @@ func (c gorillaChunk) addBitPattern(offset uint16, pattern uint64, n uint16) uin newOffset = offset + n ) + // Clean up the parts of the footer we will write into. (But not more as + // we are still using the value related part of the footer when we have + // already overwritten timestamp related parts.) if newOffset > gorillaNextSampleBitOffsetThreshold { - // We'll write into the footer. Clean it first. - for i := gorillaNextSampleBitOffsetThreshold / 8; i < len(c); i++ { - c[i] = 0 + pos := offset + if pos < gorillaNextSampleBitOffsetThreshold { + pos = gorillaNextSampleBitOffsetThreshold + } + for pos < newOffset { + posInByte := pos % 8 + bitsToClear := newOffset - pos + if bitsToClear > 8-posInByte { + bitsToClear = 8 - posInByte + } + c[pos/8] &^= bitMask[bitsToClear][posInByte] + pos += bitsToClear } } @@ -1091,7 +1103,7 @@ func (it *gorillaChunkIterator) reset() { // reset, a chunk can be rewound again. func (it *gorillaChunkIterator) rewind(t model.Time, v model.SampleValue) { if it.rewound { - panic("cannet rewind Gorilla chunk twice") + panic("cannot rewind Gorilla chunk twice") } it.rewound = true it.nextT = it.t diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 6bde2740f..b871bef94 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -1426,7 +1426,10 @@ func testFuzz(t *testing.T, encoding chunkEncoding) { for _, sample := range samples { s.Append(sample) } - return verifyStorage(t, s, samples, 24*7*time.Hour) + if !verifyStorageRandom(t, s, samples) { + return false + } + return verifyStorageSequential(t, s, samples) } if err := quick.Check(check, nil); err != nil { @@ -1490,11 +1493,12 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) { for _, sample := range samples[start:middle] { s.Append(sample) } - verifyStorage(b, s.(*memorySeriesStorage), samples[:middle], o.PersistenceRetentionPeriod) + verifyStorageRandom(b, s.(*memorySeriesStorage), samples[:middle]) for _, sample := range samples[middle:end] { s.Append(sample) } - verifyStorage(b, s.(*memorySeriesStorage), samples[:end], o.PersistenceRetentionPeriod) + verifyStorageRandom(b, s.(*memorySeriesStorage), samples[:end]) + verifyStorageSequential(b, s.(*memorySeriesStorage), samples) } } @@ -1515,12 +1519,11 @@ func createRandomSamples(metricName string, minLen int) model.Samples { type deltaApplier func(model.SampleValue) model.SampleValue var ( - maxMetrics = 5 - maxStreakLength = 500 - maxTimeDelta = 10000 - maxTimeDeltaFactor = 10 - timestamp = model.Now() - model.Time(maxTimeDelta*maxTimeDeltaFactor*minLen/4) // So that some timestamps are in the future. - generators = []struct { + maxMetrics = 5 + maxStreakLength = 2000 + maxTimeDelta = 10000 + timestamp = model.Now() - model.Time(maxTimeDelta*minLen) // So that some timestamps are in the future. + generators = []struct { createValue valueCreator applyDelta []deltaApplier }{ @@ -1564,6 +1567,28 @@ func createRandomSamples(metricName string, minLen int) model.Samples { }, }, } + timestampIncrementers = []func(baseDelta model.Time) model.Time{ + // Regular increments. + func(delta model.Time) model.Time { + return delta + }, + // Jittered increments. σ is 1/100 of delta, e.g. 10ms for 10s scrape interval. + func(delta model.Time) model.Time { + return delta + model.Time(rand.NormFloat64()*float64(delta)/100) + }, + // Regular increments, but missing a scrape with 10% chance. + func(delta model.Time) model.Time { + i := rand.Intn(100) + if i < 90 { + return delta + } + if i < 99 { + return 2 * delta + } + return 3 * delta + // Ignoring the case with more than two missed scrapes in a row. + }, + } ) // Prefill result with two samples with colliding metrics (to test fingerprint mapping). @@ -1595,13 +1620,16 @@ func createRandomSamples(metricName string, minLen int) model.Samples { } for len(result) < minLen { - // Pick a metric for this cycle. - metric := metrics[rand.Intn(len(metrics))] - timeDelta := rand.Intn(maxTimeDelta) + 1 - generator := generators[rand.Intn(len(generators))] - createValue := generator.createValue - applyDelta := generator.applyDelta[rand.Intn(len(generator.applyDelta))] - incTimestamp := func() { timestamp += model.Time(timeDelta * (rand.Intn(maxTimeDeltaFactor) + 1)) } + var ( + // Pick a metric for this cycle. + metric = metrics[rand.Intn(len(metrics))] + timeDelta = model.Time(rand.Intn(maxTimeDelta) + 1) + generator = generators[rand.Intn(len(generators))] + createValue = generator.createValue + applyDelta = generator.applyDelta[rand.Intn(len(generator.applyDelta))] + incTimestamp = timestampIncrementers[rand.Intn(len(timestampIncrementers))] + ) + switch rand.Intn(4) { case 0: // A single sample. result = append(result, &model.Sample{ @@ -1609,7 +1637,7 @@ func createRandomSamples(metricName string, minLen int) model.Samples { Value: createValue(), Timestamp: timestamp, }) - incTimestamp() + timestamp += incTimestamp(timeDelta) case 1: // A streak of random sample values. for n := rand.Intn(maxStreakLength); n >= 0; n-- { result = append(result, &model.Sample{ @@ -1617,7 +1645,7 @@ func createRandomSamples(metricName string, minLen int) model.Samples { Value: createValue(), Timestamp: timestamp, }) - incTimestamp() + timestamp += incTimestamp(timeDelta) } case 2: // A streak of sample values with incremental changes. value := createValue() @@ -1627,7 +1655,7 @@ func createRandomSamples(metricName string, minLen int) model.Samples { Value: value, Timestamp: timestamp, }) - incTimestamp() + timestamp += incTimestamp(timeDelta) value = applyDelta(value) } case 3: // A streak of constant sample values. @@ -1638,7 +1666,7 @@ func createRandomSamples(metricName string, minLen int) model.Samples { Value: value, Timestamp: timestamp, }) - incTimestamp() + timestamp += incTimestamp(timeDelta) } } } @@ -1646,31 +1674,29 @@ func createRandomSamples(metricName string, minLen int) model.Samples { return result } -func verifyStorage(t testing.TB, s *memorySeriesStorage, samples model.Samples, maxAge time.Duration) bool { +func verifyStorageRandom(t testing.TB, s *memorySeriesStorage, samples model.Samples) bool { s.WaitForIndexing() result := true for _, i := range rand.Perm(len(samples)) { sample := samples[i] - if sample.Timestamp.Before(model.TimeFromUnixNano(time.Now().Add(-maxAge).UnixNano())) { - continue - // TODO: Once we have a guaranteed cutoff at the - // retention period, we can verify here that no results - // are returned. - } fp, err := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) if err != nil { t.Fatal(err) } p := s.NewPreloader() - it := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp) + it := p.PreloadInstant(fp, sample.Timestamp, 0) found := it.ValueAtOrBeforeTime(sample.Timestamp) - if found.Timestamp == model.Earliest { + startTime := it.(*boundedIterator).start + switch { + case found.Timestamp != model.Earliest && sample.Timestamp.Before(startTime): + t.Errorf("Sample #%d %#v: Expected outdated sample to be excluded.", i, sample) + result = false + case found.Timestamp == model.Earliest && !sample.Timestamp.Before(startTime): t.Errorf("Sample #%d %#v: Expected sample not found.", i, sample) result = false - p.Close() - continue - } - if sample.Value != found.Value || sample.Timestamp != found.Timestamp { + case found.Timestamp == model.Earliest && sample.Timestamp.Before(startTime): + // All good. Outdated sample dropped. + case sample.Value != found.Value || sample.Timestamp != found.Timestamp: t.Errorf( "Sample #%d %#v: Value (or timestamp) mismatch, want %f (at time %v), got %f (at time %v).", i, sample, sample.Value, sample.Timestamp, found.Value, found.Timestamp, @@ -1682,6 +1708,60 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples model.Samples, return result } +func verifyStorageSequential(t testing.TB, s *memorySeriesStorage, samples model.Samples) bool { + s.WaitForIndexing() + var ( + result = true + fp model.Fingerprint + p = s.NewPreloader() + it SeriesIterator + r []model.SamplePair + j int + ) + defer func() { + p.Close() + }() + for i, sample := range samples { + newFP, err := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) + if err != nil { + t.Fatal(err) + } + if it == nil || newFP != fp { + fp = newFP + p.Close() + p = s.NewPreloader() + it = p.PreloadRange(fp, sample.Timestamp, model.Latest) + r = it.RangeValues(metric.Interval{ + OldestInclusive: sample.Timestamp, + NewestInclusive: model.Latest, + }) + j = -1 + } + startTime := it.(*boundedIterator).start + if sample.Timestamp.Before(startTime) { + continue + } + j++ + if j >= len(r) { + t.Errorf( + "Sample #%d %v not found.", + i, sample, + ) + result = false + continue + } + found := r[j] + if sample.Value != found.Value || sample.Timestamp != found.Timestamp { + t.Errorf( + "Sample #%d %v: Value (or timestamp) mismatch, want %f (at time %v), got %f (at time %v).", + i, sample, sample.Value, sample.Timestamp, found.Value, found.Timestamp, + ) + result = false + } + } + return result +} + func TestAppendOutOfOrder(t *testing.T) { s, closer := NewTestStorage(t, 1) defer closer.Close()