Make chunk iterators more DRY

This finally extracts all the common code of the two chunk iterators
into one. Any future chunk encodings with fast access by index can use
the same iterator by simply providing an indexAccessor. Other future
chunk encodings without fast index access (like Gorilla-style) can
still implement the chunkIterator interface as usual.
pull/1477/head
beorn7 2016-03-07 20:23:14 +01:00
parent 32f280a3cd
commit c13b1ecfe9
3 changed files with 191 additions and 247 deletions

View File

@ -17,6 +17,7 @@ import (
"container/list"
"fmt"
"io"
"sort"
"sync"
"sync/atomic"
@ -342,3 +343,102 @@ func newChunkForEncoding(encoding chunkEncoding) (chunk, error) {
return nil, fmt.Errorf("unknown chunk encoding: %v", encoding)
}
}
// indexAccessor allows accesses to samples by index.
type indexAccessor interface {
timestampAtIndex(int) model.Time
sampleValueAtIndex(int) model.SampleValue
err() error
}
// indexAccessingChunkIterator is a chunk iterator for chunks for which an
// indexAccessor implementation exists.
type indexAccessingChunkIterator struct {
len int
pos int
lastValue model.SamplePair
acc indexAccessor
}
func newIndexAccessingChunkIterator(len int, acc indexAccessor) *indexAccessingChunkIterator {
return &indexAccessingChunkIterator{
len: len,
pos: -1,
lastValue: ZeroSamplePair,
acc: acc,
}
}
// lastTimestamp implements chunkIterator.
func (it *indexAccessingChunkIterator) lastTimestamp() (model.Time, error) {
return it.acc.timestampAtIndex(it.len - 1), it.acc.err()
}
// valueAtOrBeforeTime implements chunkIterator.
func (it *indexAccessingChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
i := sort.Search(it.len, func(i int) bool {
return it.acc.timestampAtIndex(i).After(t)
})
if i == 0 || it.acc.err() != nil {
return ZeroSamplePair, it.acc.err()
}
it.pos = i - 1
it.lastValue = model.SamplePair{
Timestamp: it.acc.timestampAtIndex(i - 1),
Value: it.acc.sampleValueAtIndex(i - 1),
}
return it.lastValue, it.acc.err()
}
// rangeValues implements chunkIterator.
func (it *indexAccessingChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
oldest := sort.Search(it.len, func(i int) bool {
return !it.acc.timestampAtIndex(i).Before(in.OldestInclusive)
})
newest := sort.Search(it.len, func(i int) bool {
return it.acc.timestampAtIndex(i).After(in.NewestInclusive)
})
if oldest == it.len || it.acc.err() != nil {
return nil, it.acc.err()
}
result := make([]model.SamplePair, 0, newest-oldest)
for i := oldest; i < newest; i++ {
it.pos = i
it.lastValue = model.SamplePair{
Timestamp: it.acc.timestampAtIndex(i),
Value: it.acc.sampleValueAtIndex(i),
}
result = append(result, it.lastValue)
}
return result, it.acc.err()
}
// contains implements chunkIterator.
func (it *indexAccessingChunkIterator) contains(t model.Time) (bool, error) {
return !t.Before(it.acc.timestampAtIndex(0)) &&
!t.After(it.acc.timestampAtIndex(it.len-1)), it.acc.err()
}
// scan implements chunkIterator.
func (it *indexAccessingChunkIterator) scan() bool {
it.pos++
if it.pos >= it.len {
return false
}
it.lastValue = model.SamplePair{
Timestamp: it.acc.timestampAtIndex(it.pos),
Value: it.acc.sampleValueAtIndex(it.pos),
}
return it.acc.err() == nil
}
// value implements chunkIterator.
func (it *indexAccessingChunkIterator) value() model.SamplePair {
return it.lastValue
}
// err implements chunkIterator.
func (it *indexAccessingChunkIterator) err() error {
return it.acc.err()
}

View File

@ -18,11 +18,8 @@ import (
"fmt"
"io"
"math"
"sort"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/metric"
)
// The 21-byte header of a delta-encoded chunk looks like:
@ -201,17 +198,14 @@ func (c deltaEncodedChunk) firstTime() model.Time {
// newIterator implements chunk.
func (c *deltaEncodedChunk) newIterator() chunkIterator {
return &deltaEncodedChunkIterator{
c: *c,
len: c.len(),
baseT: c.baseTime(),
baseV: c.baseValue(),
tBytes: c.timeBytes(),
vBytes: c.valueBytes(),
isInt: c.isInt(),
pos: -1,
lastValue: ZeroSamplePair,
}
return newIndexAccessingChunkIterator(c.len(), &deltaEncodedIndexAccessor{
c: *c,
baseT: c.baseTime(),
baseV: c.baseValue(),
tBytes: c.timeBytes(),
vBytes: c.valueBytes(),
isInt: c.isInt(),
})
}
// marshal implements chunk.
@ -305,137 +299,65 @@ func (c deltaEncodedChunk) len() int {
return (len(c) - deltaHeaderBytes) / c.sampleSize()
}
// deltaEncodedChunkIterator implements chunkIterator.
type deltaEncodedChunkIterator struct {
// deltaEncodedIndexAccessor implements indexAccessor.
type deltaEncodedIndexAccessor struct {
c deltaEncodedChunk
len int
baseT model.Time
baseV model.SampleValue
tBytes, vBytes deltaBytes
isInt bool
pos int
lastValue model.SamplePair
lastErr error
}
// lastTimestamp implements chunkIterator.
func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) {
return it.timestampAtIndex(it.len - 1), it.lastErr
func (acc *deltaEncodedIndexAccessor) err() error {
return acc.lastErr
}
// valueAtOrBeforeTime implements chunkIterator.
func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
i := sort.Search(it.len, func(i int) bool {
return it.timestampAtIndex(i).After(t)
})
if i == 0 || it.lastErr != nil {
return ZeroSamplePair, it.lastErr
}
it.pos = i - 1
it.lastValue = model.SamplePair{
Timestamp: it.timestampAtIndex(i - 1),
Value: it.sampleValueAtIndex(i - 1),
}
return it.lastValue, it.lastErr
}
func (acc *deltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time {
offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes)
// rangeValues implements chunkIterator.
func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
oldest := sort.Search(it.len, func(i int) bool {
return !it.timestampAtIndex(i).Before(in.OldestInclusive)
})
newest := sort.Search(it.len, func(i int) bool {
return it.timestampAtIndex(i).After(in.NewestInclusive)
})
if oldest == it.len || it.lastErr != nil {
return nil, it.lastErr
}
result := make([]model.SamplePair, 0, newest-oldest)
for i := oldest; i < newest; i++ {
it.pos = i
it.lastValue = model.SamplePair{
Timestamp: it.timestampAtIndex(i),
Value: it.sampleValueAtIndex(i),
}
result = append(result, it.lastValue)
}
return result, it.lastErr
}
// contains implements chunkIterator.
func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)), it.lastErr
}
// scan implements chunkIterator.
func (it *deltaEncodedChunkIterator) scan() bool {
it.pos++
if it.pos >= it.len {
return false
}
it.lastValue = model.SamplePair{
Timestamp: it.timestampAtIndex(it.pos),
Value: it.sampleValueAtIndex(it.pos),
}
return it.lastErr == nil
}
// value implements chunkIterator.
func (it *deltaEncodedChunkIterator) value() model.SamplePair {
return it.lastValue
}
// err implements chunkIterator.
func (it *deltaEncodedChunkIterator) err() error {
return it.lastErr
}
func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time {
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes)
switch it.tBytes {
switch acc.tBytes {
case d1:
return it.baseT + model.Time(uint8(it.c[offset]))
return acc.baseT + model.Time(uint8(acc.c[offset]))
case d2:
return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:]))
return acc.baseT + model.Time(binary.LittleEndian.Uint16(acc.c[offset:]))
case d4:
return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:]))
return acc.baseT + model.Time(binary.LittleEndian.Uint32(acc.c[offset:]))
case d8:
// Take absolute value for d8.
return model.Time(binary.LittleEndian.Uint64(it.c[offset:]))
return model.Time(binary.LittleEndian.Uint64(acc.c[offset:]))
default:
it.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes)
acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes)
}
return model.Earliest
}
func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue {
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes)
func (acc *deltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue {
offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes) + int(acc.tBytes)
if it.isInt {
switch it.vBytes {
if acc.isInt {
switch acc.vBytes {
case d0:
return it.baseV
return acc.baseV
case d1:
return it.baseV + model.SampleValue(int8(it.c[offset]))
return acc.baseV + model.SampleValue(int8(acc.c[offset]))
case d2:
return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:])))
return acc.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:])))
case d4:
return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
return acc.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:])))
// No d8 for ints.
default:
it.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes)
acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes)
}
} else {
switch it.vBytes {
switch acc.vBytes {
case d4:
return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:])))
return acc.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:])))
case d8:
// Take absolute value for d8.
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:])))
default:
it.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes)
acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes)
}
}
return 0

View File

@ -18,11 +18,8 @@ import (
"fmt"
"io"
"math"
"sort"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/metric"
)
// The 37-byte header of a delta-encoded chunk looks like:
@ -207,19 +204,16 @@ func (c doubleDeltaEncodedChunk) firstTime() model.Time {
// newIterator implements chunk.
func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
return &doubleDeltaEncodedChunkIterator{
c: *c,
len: c.len(),
baseT: c.baseTime(),
baseΔT: c.baseTimeDelta(),
baseV: c.baseValue(),
baseΔV: c.baseValueDelta(),
tBytes: c.timeBytes(),
vBytes: c.valueBytes(),
isInt: c.isInt(),
pos: -1,
lastValue: ZeroSamplePair,
}
return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{
c: *c,
baseT: c.baseTime(),
baseΔT: c.baseTimeDelta(),
baseV: c.baseValue(),
baseΔV: c.baseValueDelta(),
tBytes: c.timeBytes(),
vBytes: c.valueBytes(),
isInt: c.isInt(),
})
}
// marshal implements chunk.
@ -411,176 +405,104 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb delt
return []chunk{&c}, nil
}
// doubleDeltaEncodedChunkIterator implements chunkIterator.
type doubleDeltaEncodedChunkIterator struct {
// doubleDeltaEncodedIndexAccessor implements indexAccessor.
type doubleDeltaEncodedIndexAccessor struct {
c doubleDeltaEncodedChunk
len int
baseT, baseΔT model.Time
baseV, baseΔV model.SampleValue
tBytes, vBytes deltaBytes
isInt bool
pos int
lastValue model.SamplePair
lastErr error
}
// lastTimestamp implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) {
return it.timestampAtIndex(it.len - 1), it.lastErr
func (acc *doubleDeltaEncodedIndexAccessor) err() error {
return acc.lastErr
}
// valueAtOrBeforeTime implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
i := sort.Search(it.len, func(i int) bool {
return it.timestampAtIndex(i).After(t)
})
if i == 0 || it.lastErr != nil {
return ZeroSamplePair, it.lastErr
}
it.pos = i - 1
it.lastValue = model.SamplePair{
Timestamp: it.timestampAtIndex(i - 1),
Value: it.sampleValueAtIndex(i - 1),
}
return it.lastValue, it.lastErr
}
// rangeValues implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
oldest := sort.Search(it.len, func(i int) bool {
return !it.timestampAtIndex(i).Before(in.OldestInclusive)
})
newest := sort.Search(it.len, func(i int) bool {
return it.timestampAtIndex(i).After(in.NewestInclusive)
})
if oldest == it.len || it.lastErr != nil {
return nil, it.lastErr
}
result := make([]model.SamplePair, 0, newest-oldest)
for i := oldest; i < newest; i++ {
it.pos = i
it.lastValue = model.SamplePair{
Timestamp: it.timestampAtIndex(i),
Value: it.sampleValueAtIndex(i),
}
result = append(result, it.lastValue)
}
return result, it.lastErr
}
// contains implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)), it.lastErr
}
// scan implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) scan() bool {
it.pos++
if it.pos >= it.len {
return false
}
it.lastValue = model.SamplePair{
Timestamp: it.timestampAtIndex(it.pos),
Value: it.sampleValueAtIndex(it.pos),
}
return it.lastErr == nil
}
// value implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) value() model.SamplePair {
return it.lastValue
}
// err implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) err() error {
return it.lastErr
}
func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time {
func (acc *doubleDeltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time {
if idx == 0 {
return it.baseT
return acc.baseT
}
if idx == 1 {
// If time bytes are at d8, the time is saved directly rather
// than as a difference.
if it.tBytes == d8 {
return it.baseΔT
if acc.tBytes == d8 {
return acc.baseΔT
}
return it.baseT + it.baseΔT
return acc.baseT + acc.baseΔT
}
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes)
offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes)
switch it.tBytes {
switch acc.tBytes {
case d1:
return it.baseT +
model.Time(idx)*it.baseΔT +
model.Time(int8(it.c[offset]))
return acc.baseT +
model.Time(idx)*acc.baseΔT +
model.Time(int8(acc.c[offset]))
case d2:
return it.baseT +
model.Time(idx)*it.baseΔT +
model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:])))
return acc.baseT +
model.Time(idx)*acc.baseΔT +
model.Time(int16(binary.LittleEndian.Uint16(acc.c[offset:])))
case d4:
return it.baseT +
model.Time(idx)*it.baseΔT +
model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:])))
return acc.baseT +
model.Time(idx)*acc.baseΔT +
model.Time(int32(binary.LittleEndian.Uint32(acc.c[offset:])))
case d8:
// Take absolute value for d8.
return model.Time(binary.LittleEndian.Uint64(it.c[offset:]))
return model.Time(binary.LittleEndian.Uint64(acc.c[offset:]))
default:
it.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes)
acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes)
}
return model.Earliest
}
func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue {
func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue {
if idx == 0 {
return it.baseV
return acc.baseV
}
if idx == 1 {
// If value bytes are at d8, the value is saved directly rather
// than as a difference.
if it.vBytes == d8 {
return it.baseΔV
if acc.vBytes == d8 {
return acc.baseΔV
}
return it.baseV + it.baseΔV
return acc.baseV + acc.baseΔV
}
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes)
offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes) + int(acc.tBytes)
if it.isInt {
switch it.vBytes {
if acc.isInt {
switch acc.vBytes {
case d0:
return it.baseV +
model.SampleValue(idx)*it.baseΔV
return acc.baseV +
model.SampleValue(idx)*acc.baseΔV
case d1:
return it.baseV +
model.SampleValue(idx)*it.baseΔV +
model.SampleValue(int8(it.c[offset]))
return acc.baseV +
model.SampleValue(idx)*acc.baseΔV +
model.SampleValue(int8(acc.c[offset]))
case d2:
return it.baseV +
model.SampleValue(idx)*it.baseΔV +
model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:])))
return acc.baseV +
model.SampleValue(idx)*acc.baseΔV +
model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:])))
case d4:
return it.baseV +
model.SampleValue(idx)*it.baseΔV +
model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
return acc.baseV +
model.SampleValue(idx)*acc.baseΔV +
model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:])))
// No d8 for ints.
default:
it.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes)
acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes)
}
} else {
switch it.vBytes {
switch acc.vBytes {
case d4:
return it.baseV +
model.SampleValue(idx)*it.baseΔV +
model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:])))
return acc.baseV +
model.SampleValue(idx)*acc.baseΔV +
model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:])))
case d8:
// Take absolute value for d8.
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:])))
default:
it.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes)
acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes)
}
}
return 0