You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
prometheus/storage/local/doubledelta.go

534 lines
16 KiB

// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
"encoding/binary"
"fmt"
"io"
"math"
"sort"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
)
// The 37-byte header of a delta-encoded chunk looks like:
//
// - used buf bytes: 2 bytes
// - time double-delta bytes: 1 bytes
// - value double-delta bytes: 1 bytes
// - is integer: 1 byte
// - base time: 8 bytes
// - base value: 8 bytes
// - base time delta: 8 bytes
// - base value delta: 8 bytes
const (
doubleDeltaHeaderBytes = 37
doubleDeltaHeaderBufLenOffset = 0
doubleDeltaHeaderTimeBytesOffset = 2
doubleDeltaHeaderValueBytesOffset = 3
doubleDeltaHeaderIsIntOffset = 4
doubleDeltaHeaderBaseTimeOffset = 5
doubleDeltaHeaderBaseValueOffset = 13
doubleDeltaHeaderBaseTimeDeltaOffset = 21
doubleDeltaHeaderBaseValueDeltaOffset = 29
)
// A doubleDeltaEncodedChunk adaptively stores sample timestamps and values with
// a double-delta encoding of various types (int, float) and bit widths. A base
// value and timestamp and a base delta for each is saved in the header. The
// payload consists of double-deltas, i.e. deviations from the values and
// timestamps calculated by applying the base value and time and the base deltas.
// However, once 8 bytes would be needed to encode a double-delta value, a
// fall-back to the absolute numbers happens (so that timestamps are saved
// directly as int64 and values as float64).
// doubleDeltaEncodedChunk implements the chunk interface.
type doubleDeltaEncodedChunk []byte
// newDoubleDeltaEncodedChunk returns a newly allocated doubleDeltaEncodedChunk.
func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doubleDeltaEncodedChunk {
if tb < 1 {
panic("need at least 1 time delta byte")
}
if length < doubleDeltaHeaderBytes+16 {
panic(fmt.Errorf(
"chunk length %d bytes is insufficient, need at least %d",
length, doubleDeltaHeaderBytes+16,
))
}
c := make(doubleDeltaEncodedChunk, doubleDeltaHeaderIsIntOffset+1, length)
c[doubleDeltaHeaderTimeBytesOffset] = byte(tb)
c[doubleDeltaHeaderValueBytesOffset] = byte(vb)
if vb < d8 && isInt { // Only use int for fewer than 8 value double-delta bytes.
c[doubleDeltaHeaderIsIntOffset] = 1
} else {
c[doubleDeltaHeaderIsIntOffset] = 0
}
return &c
}
// add implements chunk.
func (c doubleDeltaEncodedChunk) add(s *metric.SamplePair) []chunk {
if c.len() == 0 {
return c.addFirstSample(s)
}
tb := c.timeBytes()
vb := c.valueBytes()
if c.len() == 1 {
return c.addSecondSample(s, tb, vb)
}
remainingBytes := cap(c) - len(c)
sampleSize := c.sampleSize()
// Do we generally have space for another sample in this chunk? If not,
// overflow into a new one.
if remainingBytes < sampleSize {
overflowChunks := newChunk().add(s)
return []chunk{&c, overflowChunks[0]}
}
projectedTime := c.baseTime() + clientmodel.Timestamp(c.len())*c.baseTimeDelta()
ddt := s.Timestamp - projectedTime
projectedValue := c.baseValue() + clientmodel.SampleValue(c.len())*c.baseValueDelta()
ddv := s.Value - projectedValue
ntb, nvb, nInt := tb, vb, c.isInt()
// If the new sample is incompatible with the current encoding, reencode the
// existing chunk data into new chunk(s).
if c.isInt() && !isInt64(ddv) {
// int->float.
nvb = d4
nInt = false
} else if !c.isInt() && vb == d4 && projectedValue+clientmodel.SampleValue(float32(ddv)) != s.Value {
// float32->float64.
nvb = d8
} else {
if tb < d8 {
// Maybe more bytes for timestamp.
ntb = max(tb, bytesNeededForSignedTimestampDelta(ddt))
}
if c.isInt() && vb < d8 {
// Maybe more bytes for sample value.
nvb = max(vb, bytesNeededForIntegerSampleValueDelta(ddv))
}
}
if tb != ntb || vb != nvb || c.isInt() != nInt {
if len(c)*2 < cap(c) {
return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
}
// Chunk is already half full. Better create a new one and save the transcoding efforts.
overflowChunks := newChunk().add(s)
return []chunk{&c, overflowChunks[0]}
}
offset := len(c)
c = c[:offset+sampleSize]
switch tb {
case d1:
c[offset] = byte(ddt)
case d2:
binary.LittleEndian.PutUint16(c[offset:], uint16(ddt))
case d4:
binary.LittleEndian.PutUint32(c[offset:], uint32(ddt))
case d8:
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
default:
panic("invalid number of bytes for time delta")
}
offset += int(tb)
if c.isInt() {
switch vb {
case d0:
// No-op. Constant delta is stored as base value.
case d1:
c[offset] = byte(ddv)
case d2:
binary.LittleEndian.PutUint16(c[offset:], uint16(ddv))
case d4:
binary.LittleEndian.PutUint32(c[offset:], uint32(ddv))
// d8 must not happen. Those samples are encoded as float64.
default:
panic("invalid number of bytes for integer delta")
}
} else {
switch vb {
case d4:
binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(ddv)))
case d8:
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
default:
panic("invalid number of bytes for floating point delta")
}
}
return []chunk{&c}
}
// clone implements chunk.
func (c doubleDeltaEncodedChunk) clone() chunk {
clone := make(doubleDeltaEncodedChunk, len(c), cap(c))
copy(clone, c)
return &clone
}
// firstTime implements chunk.
func (c doubleDeltaEncodedChunk) firstTime() clientmodel.Timestamp {
return c.baseTime()
}
// lastTime implements chunk.
func (c doubleDeltaEncodedChunk) lastTime() clientmodel.Timestamp {
return c.valueAtIndex(c.len() - 1).Timestamp
}
// newIterator implements chunk.
func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
return &doubleDeltaEncodedChunkIterator{
chunk: c,
}
}
// marshal implements chunk.
func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error {
if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint.")
}
binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c)))
n, err := w.Write(c[:cap(c)])
if err != nil {
return err
}
if n != cap(c) {
return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n)
}
return nil
}
// unmarshal implements chunk.
func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)]
if _, err := io.ReadFull(r, *c); err != nil {
return err
}
*c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])]
return nil
}
// unmarshalFromBuf implements chunk.
func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) {
*c = (*c)[:cap(*c)]
copy(*c, buf)
*c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])]
}
// values implements chunk.
func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair {
n := c.len()
valuesChan := make(chan *metric.SamplePair)
go func() {
for i := 0; i < n; i++ {
valuesChan <- c.valueAtIndex(i)
}
close(valuesChan)
}()
return valuesChan
}
// encoding implements chunk.
func (c doubleDeltaEncodedChunk) encoding() chunkEncoding { return doubleDelta }
func (c doubleDeltaEncodedChunk) baseTime() clientmodel.Timestamp {
return clientmodel.Timestamp(
binary.LittleEndian.Uint64(
c[doubleDeltaHeaderBaseTimeOffset:],
),
)
}
func (c doubleDeltaEncodedChunk) baseValue() clientmodel.SampleValue {
return clientmodel.SampleValue(
math.Float64frombits(
binary.LittleEndian.Uint64(
c[doubleDeltaHeaderBaseValueOffset:],
),
),
)
}
func (c doubleDeltaEncodedChunk) baseTimeDelta() clientmodel.Timestamp {
return clientmodel.Timestamp(
binary.LittleEndian.Uint64(
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
),
)
}
func (c doubleDeltaEncodedChunk) baseValueDelta() clientmodel.SampleValue {
return clientmodel.SampleValue(
math.Float64frombits(
binary.LittleEndian.Uint64(
c[doubleDeltaHeaderBaseValueDeltaOffset:],
),
),
)
}
func (c doubleDeltaEncodedChunk) timeBytes() deltaBytes {
return deltaBytes(c[doubleDeltaHeaderTimeBytesOffset])
}
func (c doubleDeltaEncodedChunk) valueBytes() deltaBytes {
return deltaBytes(c[doubleDeltaHeaderValueBytesOffset])
}
func (c doubleDeltaEncodedChunk) sampleSize() int {
return int(c.timeBytes() + c.valueBytes())
}
func (c doubleDeltaEncodedChunk) len() int {
if len(c) <= doubleDeltaHeaderIsIntOffset+1 {
return 0
}
if len(c) <= doubleDeltaHeaderBaseValueOffset+8 {
return 1
}
return (len(c)-doubleDeltaHeaderBytes)/c.sampleSize() + 2
}
func (c doubleDeltaEncodedChunk) isInt() bool {
return c[doubleDeltaHeaderIsIntOffset] == 1
}
// addFirstSample is a helper method only used by c.add(). It adds timestamp and
// value as base time and value.
func (c doubleDeltaEncodedChunk) addFirstSample(s *metric.SamplePair) []chunk {
c = c[:doubleDeltaHeaderBaseValueOffset+8]
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseTimeOffset:],
uint64(s.Timestamp),
)
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseValueOffset:],
math.Float64bits(float64(s.Value)),
)
return []chunk{&c}
}
// addSecondSample is a helper method only used by c.add(). It calculates the
// base delta from the provided sample and adds it to the chunk.
func (c doubleDeltaEncodedChunk) addSecondSample(s *metric.SamplePair, tb, vb deltaBytes) []chunk {
baseTimeDelta := s.Timestamp - c.baseTime()
if baseTimeDelta < 0 {
// TODO(beorn7): We ignore this irregular case for now. Once
// https://github.com/prometheus/prometheus/issues/481 is
// fixed, we should panic here instead.
return []chunk{&c}
}
c = c[:doubleDeltaHeaderBytes]
if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 {
// If already the base delta needs d8 (or we are at d8
// already, anyway), we better encode this timestamp
// directly rather than as a delta and switch everything
// to d8.
c[doubleDeltaHeaderTimeBytesOffset] = byte(d8)
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
uint64(s.Timestamp),
)
} else {
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
uint64(baseTimeDelta),
)
}
baseValue := c.baseValue()
baseValueDelta := s.Value - baseValue
if vb >= d8 || baseValue+baseValueDelta != s.Value {
// If we can't reproduce the original sample value (or
// if we are at d8 already, anyway), we better encode
// this value directly rather than as a delta and switch
// everything to d8.
c[doubleDeltaHeaderValueBytesOffset] = byte(d8)
c[doubleDeltaHeaderIsIntOffset] = 0
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseValueDeltaOffset:],
math.Float64bits(float64(s.Value)),
)
} else {
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseValueDeltaOffset:],
math.Float64bits(float64(baseValueDelta)),
)
}
return []chunk{&c}
}
func (c doubleDeltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair {
if idx == 0 {
return &metric.SamplePair{
Timestamp: c.baseTime(),
Value: c.baseValue(),
}
}
if idx == 1 {
// If time and/or value bytes are at d8, the time and value is
// saved directly rather than as a difference.
timestamp := c.baseTimeDelta()
if c.timeBytes() < d8 {
timestamp += c.baseTime()
}
value := c.baseValueDelta()
if c.valueBytes() < d8 {
value += c.baseValue()
}
return &metric.SamplePair{
Timestamp: timestamp,
Value: value,
}
}
offset := doubleDeltaHeaderBytes + (idx-2)*c.sampleSize()
var ts clientmodel.Timestamp
switch c.timeBytes() {
case d1:
ts = c.baseTime() +
clientmodel.Timestamp(idx)*c.baseTimeDelta() +
clientmodel.Timestamp(int8(c[offset]))
case d2:
ts = c.baseTime() +
clientmodel.Timestamp(idx)*c.baseTimeDelta() +
clientmodel.Timestamp(int16(binary.LittleEndian.Uint16(c[offset:])))
case d4:
ts = c.baseTime() +
clientmodel.Timestamp(idx)*c.baseTimeDelta() +
clientmodel.Timestamp(int32(binary.LittleEndian.Uint32(c[offset:])))
case d8:
// Take absolute value for d8.
ts = clientmodel.Timestamp(binary.LittleEndian.Uint64(c[offset:]))
default:
panic("Invalid number of bytes for time delta")
}
offset += int(c.timeBytes())
var v clientmodel.SampleValue
if c.isInt() {
switch c.valueBytes() {
case d0:
v = c.baseValue() +
clientmodel.SampleValue(idx)*c.baseValueDelta()
case d1:
v = c.baseValue() +
clientmodel.SampleValue(idx)*c.baseValueDelta() +
clientmodel.SampleValue(int8(c[offset]))
case d2:
v = c.baseValue() +
clientmodel.SampleValue(idx)*c.baseValueDelta() +
clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c[offset:])))
case d4:
v = c.baseValue() +
clientmodel.SampleValue(idx)*c.baseValueDelta() +
clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c[offset:])))
// No d8 for ints.
default:
panic("Invalid number of bytes for integer delta")
}
} else {
switch c.valueBytes() {
case d4:
v = c.baseValue() +
clientmodel.SampleValue(idx)*c.baseValueDelta() +
clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c[offset:])))
case d8:
// Take absolute value for d8.
v = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[offset:])))
default:
panic("Invalid number of bytes for floating point delta")
}
}
return &metric.SamplePair{
Timestamp: ts,
Value: v,
}
}
// doubleDeltaEncodedChunkIterator implements chunkIterator.
type doubleDeltaEncodedChunkIterator struct {
chunk *doubleDeltaEncodedChunk
// TODO(beorn7): add more fields here to keep track of last position.
}
// getValueAtTime implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values {
// TODO(beorn7): Implement in a more efficient way making use of the
// state of the iterator and internals of the doubleDeltaChunk.
i := sort.Search(it.chunk.len(), func(i int) bool {
return !it.chunk.valueAtIndex(i).Timestamp.Before(t)
})
switch i {
case 0:
return metric.Values{*it.chunk.valueAtIndex(0)}
case it.chunk.len():
return metric.Values{*it.chunk.valueAtIndex(it.chunk.len() - 1)}
default:
v := it.chunk.valueAtIndex(i)
if v.Timestamp.Equal(t) {
return metric.Values{*v}
}
return metric.Values{*it.chunk.valueAtIndex(i - 1), *v}
}
}
// getRangeValues implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) getRangeValues(in metric.Interval) metric.Values {
// TODO(beorn7): Implement in a more efficient way making use of the
// state of the iterator and internals of the doubleDeltaChunk.
oldest := sort.Search(it.chunk.len(), func(i int) bool {
return !it.chunk.valueAtIndex(i).Timestamp.Before(in.OldestInclusive)
})
newest := sort.Search(it.chunk.len(), func(i int) bool {
return it.chunk.valueAtIndex(i).Timestamp.After(in.NewestInclusive)
})
if oldest == it.chunk.len() {
return nil
}
result := make(metric.Values, 0, newest-oldest)
for i := oldest; i < newest; i++ {
result = append(result, *it.chunk.valueAtIndex(i))
}
return result
}
// contains implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) contains(t clientmodel.Timestamp) bool {
return !t.Before(it.chunk.firstTime()) && !t.After(it.chunk.lastTime())
}