Implement Gorilla-inspired chunk encoding

This is not a verbatim implementation of the Gorilla encoding.  First
of all, it could not, even if we wanted, because Prometheus has a
different chunking model (constant size, not constant time).  Second,
this adds a number of changes that improve the encoding in general or
at least for the specific use case of Prometheus (and are partially
only possible in the context of Prometheus). See comments in the code
for details.
pull/1491/head
beorn7 2016-03-12 21:34:51 +01:00
parent e83f05fe93
commit 8cdced3850
9 changed files with 1314 additions and 27 deletions

View File

@ -143,7 +143,7 @@ func init() {
)
cfg.fs.Var(
&local.DefaultChunkEncoding, "storage.local.chunk-encoding-version",
"Which chunk encoding version to use for newly created chunks. Currently supported is 0 (delta encoding) and 1 (double-delta encoding).",
"Which chunk encoding version to use for newly created chunks. Currently supported is 0 (delta encoding), 1 (double-delta encoding), and 2 (Gorilla-style encoding).",
)
// Index cache sizes.
cfg.fs.IntVar(

View File

@ -15,6 +15,7 @@ package local
import (
"container/list"
"errors"
"fmt"
"io"
"sort"
@ -29,6 +30,8 @@ import (
// The DefaultChunkEncoding can be changed via a flag.
var DefaultChunkEncoding = doubleDelta
var errChunkBoundsExceeded = errors.New("attempted access outside of chunk boundaries")
type chunkEncoding byte
// String implements flag.Value.
@ -43,6 +46,8 @@ func (ce *chunkEncoding) Set(s string) error {
*ce = delta
case "1":
*ce = doubleDelta
case "2":
*ce = gorilla
default:
return fmt.Errorf("invalid chunk encoding: %s", s)
}
@ -52,6 +57,7 @@ func (ce *chunkEncoding) Set(s string) error {
const (
delta chunkEncoding = iota
doubleDelta
gorilla
)
// chunkDesc contains meta-data for a chunk. Pay special attention to the
@ -306,6 +312,21 @@ func rangeValues(it chunkIterator, in metric.Interval) ([]model.SamplePair, erro
return result, it.err()
}
// addToOverflowChunk is a utility function that creates a new chunk as overflow
// chunk, addse the provided sample to it, and returns a chunk slice containing
// the provided old chunk followed by the new overflow chunk.
func addToOverflowChunk(c chunk, s model.SamplePair) ([]chunk, error) {
overflowChunks, err := newChunk().add(s)
if err != nil {
return nil, err
}
return []chunk{c, overflowChunks[0]}, nil
}
// transcodeAndAdd is a utility function that transcodes the dst chunk into the
// provided src chunk (plus the necessary overflow chunks) and then adds the
// provided sample. It returns the new chunks (transcoded plus overflow) with
// the new sample at the end.
func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) {
chunkOps.WithLabelValues(transcode).Inc()
@ -334,7 +355,7 @@ func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error)
}
// newChunk creates a new chunk according to the encoding set by the
// defaultChunkEncoding flag.
// DefaultChunkEncoding flag.
func newChunk() chunk {
chunk, err := newChunkForEncoding(DefaultChunkEncoding)
if err != nil {
@ -349,6 +370,8 @@ func newChunkForEncoding(encoding chunkEncoding) (chunk, error) {
return newDeltaEncodedChunk(d1, d0, true, chunkLen), nil
case doubleDelta:
return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen), nil
case gorilla:
return newGorillaChunk(gorillaZeroEncoding), nil
default:
return nil, fmt.Errorf("unknown chunk encoding: %v", encoding)
}

View File

@ -74,6 +74,7 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncod
// add implements chunk.
func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
// TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation.
if c.len() == 0 {
c = c[:deltaHeaderBytes]
binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp))
@ -86,11 +87,7 @@ func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
// Do we generally have space for another sample in this chunk? If not,
// overflow into a new one.
if remainingBytes < sampleSize {
overflowChunks, err := newChunk().add(s)
if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
return addToOverflowChunk(&c, s)
}
baseValue := c.baseValue()
@ -130,11 +127,7 @@ func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
}
// Chunk is already half full. Better create a new one and save the transcoding efforts.
overflowChunks, err := newChunk().add(s)
if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
return addToOverflowChunk(&c, s)
}
offset := len(c)

View File

@ -81,6 +81,7 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub
// add implements chunk.
func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
// TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation.
if c.len() == 0 {
return c.addFirstSample(s), nil
}
@ -98,11 +99,7 @@ func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
// Do we generally have space for another sample in this chunk? If not,
// overflow into a new one.
if remainingBytes < sampleSize {
overflowChunks, err := newChunk().add(s)
if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
return addToOverflowChunk(&c, s)
}
projectedTime := c.baseTime() + model.Time(c.len())*c.baseTimeDelta()
@ -136,11 +133,7 @@ func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
}
// Chunk is already half full. Better create a new one and save the transcoding efforts.
overflowChunks, err := newChunk().add(s)
if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
return addToOverflowChunk(&c, s)
}
offset := len(c)

1103
storage/local/gorilla.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,75 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import "github.com/prometheus/common/model"
var (
// bit masks for consecutive bits in a byte at various offsets.
bitMask = [][]byte{
{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, // 0 bit
{0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01}, // 1 bit
{0xC0, 0x60, 0x30, 0x18, 0x0C, 0x06, 0x03, 0x01}, // 2 bit
{0xE0, 0x70, 0x38, 0x1C, 0x0E, 0x07, 0x03, 0x01}, // 3 bit
{0xF0, 0x78, 0x3C, 0x1E, 0x0F, 0x07, 0x03, 0x01}, // 4 bit
{0xF8, 0x7C, 0x3E, 0x1F, 0x0F, 0x07, 0x03, 0x01}, // 5 bit
{0xFC, 0x7E, 0x3F, 0x1F, 0x0F, 0x07, 0x03, 0x01}, // 6 bit
{0xFE, 0x7F, 0x3F, 0x1F, 0x0F, 0x07, 0x03, 0x01}, // 7 bit
{0xFF, 0x7F, 0x3F, 0x1F, 0x0F, 0x07, 0x03, 0x01}, // 8 bit
}
)
// isInt32 returns true if v can be represented as an int32.
func isInt32(v model.SampleValue) bool {
return model.SampleValue(int32(v)) == v
}
// countBits returs the number of leading zero bits and the number of
// significant bits after that in the given bit pattern. The maximum number of
// leading zeros is 31 (so that it can be represented by a 5bit number). Leading
// zeros beyond that are considered part of the significant bits.
func countBits(pattern uint64) (leading, significant byte) {
// TODO(beorn7): This would probably be faster with ugly endless switch
// statements.
if pattern == 0 {
return
}
for pattern < 1<<63 {
leading++
pattern <<= 1
}
for pattern > 0 {
significant++
pattern <<= 1
}
if leading > 31 { // 5 bit limit.
significant += leading - 31
leading = 31
}
return
}
// isSignedIntN returns if n can be represented as a signed int with the given
// bit length.
func isSignedIntN(i int64, n byte) bool {
upper := int64(1) << (n - 1)
if i >= upper {
return false
}
lower := upper - (1 << n)
if i < lower {
return false
}
return true
}

View File

@ -0,0 +1,52 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import "testing"
func TestCountBits(t *testing.T) {
for i := byte(0); i < 56; i++ {
for j := byte(0); j <= 8; j++ {
for k := byte(0); k < 8; k++ {
p := uint64(bitMask[j][k]) << i
gotLeading, gotSignificant := countBits(p)
wantLeading := 56 - i + k
wantSignificant := j
if j+k > 8 {
wantSignificant -= j + k - 8
}
if wantLeading > 31 {
wantSignificant += wantLeading - 31
wantLeading = 31
}
if p == 0 {
wantLeading = 0
wantSignificant = 0
}
if wantLeading != gotLeading {
t.Errorf(
"unexpected leading bit count for i=%d, j=%d, k=%d; want %d, got %d",
i, j, k, wantLeading, gotLeading,
)
}
if wantSignificant != gotSignificant {
t.Errorf(
"unexpected significant bit count for i=%d, j=%d, k=%d; want %d, got %d",
i, j, k, wantSignificant, gotSignificant,
)
}
}
}
}
}

View File

@ -653,6 +653,10 @@ func TestCheckpointAndLoadSeriesMapAndHeadsChunkType1(t *testing.T) {
testCheckpointAndLoadSeriesMapAndHeads(t, 1)
}
func TestCheckpointAndLoadSeriesMapAndHeadsChunkType2(t *testing.T) {
testCheckpointAndLoadSeriesMapAndHeads(t, 2)
}
func TestCheckpointAndLoadFPMappings(t *testing.T) {
p, closer := newTestPersistence(t, 1)
defer closer.Close()
@ -758,6 +762,10 @@ func TestFingerprintsModifiedBeforeChunkType1(t *testing.T) {
testFingerprintsModifiedBefore(t, 1)
}
func TestFingerprintsModifiedBeforeChunkType2(t *testing.T) {
testFingerprintsModifiedBefore(t, 2)
}
func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) {
p, closer := newTestPersistence(t, encoding)
defer closer.Close()
@ -822,6 +830,10 @@ func TestDropArchivedMetricChunkType1(t *testing.T) {
testDropArchivedMetric(t, 1)
}
func TestDropArchivedMetricChunkType2(t *testing.T) {
testDropArchivedMetric(t, 2)
}
type incrementalBatch struct {
fpToMetric index.FingerprintMetricMapping
expectedLnToLvs index.LabelNameLabelValuesMapping
@ -1002,6 +1014,10 @@ func TestIndexingChunkType1(t *testing.T) {
testIndexing(t, 1)
}
func TestIndexingChunkType2(t *testing.T) {
testIndexing(t, 2)
}
func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics index.FingerprintMetricMapping, p *persistence) {
p.waitForIndexing()
for fp, m := range indexedFpsToMetrics {

View File

@ -749,7 +749,7 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
for m := range s.fpToSeries.iter() {
s.fpLocker.Lock(m.fp)
defer s.fpLocker.Unlock(m.fp) // TODO remove, see below
var values []model.SamplePair
for _, cd := range m.series.chunkDescs {
if cd.isEvicted() {
@ -772,7 +772,7 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
t.Errorf("%d. Got %v; want %v", i, v.Value, samples[i].Value)
}
}
s.fpLocker.Unlock(m.fp)
//s.fpLocker.Unlock(m.fp)
}
log.Info("test done, closing")
}
@ -785,6 +785,10 @@ func TestChunkType1(t *testing.T) {
testChunk(t, 1)
}
func TestChunkType2(t *testing.T) {
testChunk(t, 2)
}
func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) {
samples := make(model.Samples, 10000)
for i := range samples {
@ -859,6 +863,10 @@ func TestValueAtTimeChunkType1(t *testing.T) {
testValueAtOrBeforeTime(t, 1)
}
func TestValueAtTimeChunkType2(t *testing.T) {
testValueAtOrBeforeTime(t, 2)
}
func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) {
samples := make(model.Samples, 10000)
for i := range samples {
@ -937,6 +945,10 @@ func BenchmarkValueAtTimeChunkType1(b *testing.B) {
benchmarkValueAtOrBeforeTime(b, 1)
}
func BenchmarkValueAtTimeChunkType2(b *testing.B) {
benchmarkValueAtOrBeforeTime(b, 2)
}
func testRangeValues(t *testing.T, encoding chunkEncoding) {
samples := make(model.Samples, 10000)
for i := range samples {
@ -1089,6 +1101,10 @@ func TestRangeValuesChunkType1(t *testing.T) {
testRangeValues(t, 1)
}
func TestRangeValuesChunkType2(t *testing.T) {
testRangeValues(t, 2)
}
func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) {
samples := make(model.Samples, 10000)
for i := range samples {
@ -1133,6 +1149,10 @@ func BenchmarkRangeValuesChunkType1(b *testing.B) {
benchmarkRangeValues(b, 1)
}
func BenchmarkRangeValuesChunkType2(b *testing.B) {
benchmarkRangeValues(b, 2)
}
func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
samples := make(model.Samples, 10000)
for i := range samples {
@ -1284,6 +1304,10 @@ func TestEvictAndPurgeSeriesChunkType1(t *testing.T) {
testEvictAndPurgeSeries(t, 1)
}
func TestEvictAndPurgeSeriesChunkType2(t *testing.T) {
testEvictAndPurgeSeries(t, 2)
}
func testEvictAndLoadChunkDescs(t *testing.T, encoding chunkEncoding) {
samples := make(model.Samples, 10000)
for i := range samples {
@ -1418,6 +1442,10 @@ func TestFuzzChunkType1(t *testing.T) {
testFuzz(t, 1)
}
func TestFuzzChunkType2(t *testing.T) {
testFuzz(t, 2)
}
// benchmarkFuzz is the benchmark version of testFuzz. The storage options are
// set such that evictions, checkpoints, and purging will happen concurrently,
// too. This benchmark will have a very long runtime (up to minutes). You can
@ -1478,6 +1506,10 @@ func BenchmarkFuzzChunkType1(b *testing.B) {
benchmarkFuzz(b, 1)
}
func BenchmarkFuzzChunkType2(b *testing.B) {
benchmarkFuzz(b, 2)
}
func createRandomSamples(metricName string, minLen int) model.Samples {
type valueCreator func() model.SampleValue
type deltaApplier func(model.SampleValue) model.SampleValue
@ -1633,15 +1665,15 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples model.Samples,
it := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp)
found := it.ValueAtOrBeforeTime(sample.Timestamp)
if found.Timestamp == model.Earliest {
t.Errorf("Sample %#v: Expected sample not found.", sample)
t.Errorf("Sample #%d %#v: Expected sample not found.", i, sample)
result = false
p.Close()
continue
}
if sample.Value != found.Value || sample.Timestamp != found.Timestamp {
t.Errorf(
"Value (or timestamp) mismatch, want %f (at time %v), got %f (at time %v).",
sample.Value, sample.Timestamp, found.Value, found.Timestamp,
"Sample #%d %#v: Value (or timestamp) mismatch, want %f (at time %v), got %f (at time %v).",
i, sample, sample.Value, sample.Timestamp, found.Value, found.Timestamp,
)
result = false
}