mirror of https://github.com/prometheus/prometheus
512 lines
15 KiB
Go
512 lines
15 KiB
Go
// Copyright 2014 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package chunk
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
|
|
"github.com/prometheus/common/model"
|
|
)
|
|
|
|
// The 37-byte header of a delta-encoded chunk looks like:
|
|
//
|
|
// - used buf bytes: 2 bytes
|
|
// - time double-delta bytes: 1 bytes
|
|
// - value double-delta bytes: 1 bytes
|
|
// - is integer: 1 byte
|
|
// - base time: 8 bytes
|
|
// - base value: 8 bytes
|
|
// - base time delta: 8 bytes
|
|
// - base value delta: 8 bytes
|
|
const (
|
|
doubleDeltaHeaderBytes = 37
|
|
doubleDeltaHeaderMinBytes = 21 // header isn't full for chunk w/ one sample
|
|
|
|
doubleDeltaHeaderBufLenOffset = 0
|
|
doubleDeltaHeaderTimeBytesOffset = 2
|
|
doubleDeltaHeaderValueBytesOffset = 3
|
|
doubleDeltaHeaderIsIntOffset = 4
|
|
doubleDeltaHeaderBaseTimeOffset = 5
|
|
doubleDeltaHeaderBaseValueOffset = 13
|
|
doubleDeltaHeaderBaseTimeDeltaOffset = 21
|
|
doubleDeltaHeaderBaseValueDeltaOffset = 29
|
|
)
|
|
|
|
// A doubleDeltaEncodedChunk adaptively stores sample timestamps and values with
|
|
// a double-delta encoding of various types (int, float) and bit widths. A base
|
|
// value and timestamp and a base delta for each is saved in the header. The
|
|
// payload consists of double-deltas, i.e. deviations from the values and
|
|
// timestamps calculated by applying the base value and time and the base deltas.
|
|
// However, once 8 bytes would be needed to encode a double-delta value, a
|
|
// fall-back to the absolute numbers happens (so that timestamps are saved
|
|
// directly as int64 and values as float64).
|
|
// doubleDeltaEncodedChunk implements the chunk interface.
|
|
type doubleDeltaEncodedChunk []byte
|
|
|
|
// newDoubleDeltaEncodedChunk returns a newly allocated doubleDeltaEncodedChunk.
|
|
func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doubleDeltaEncodedChunk {
|
|
if tb < 1 {
|
|
panic("need at least 1 time delta byte")
|
|
}
|
|
if length < doubleDeltaHeaderBytes+16 {
|
|
panic(fmt.Errorf(
|
|
"chunk length %d bytes is insufficient, need at least %d",
|
|
length, doubleDeltaHeaderBytes+16,
|
|
))
|
|
}
|
|
c := make(doubleDeltaEncodedChunk, doubleDeltaHeaderIsIntOffset+1, length)
|
|
|
|
c[doubleDeltaHeaderTimeBytesOffset] = byte(tb)
|
|
c[doubleDeltaHeaderValueBytesOffset] = byte(vb)
|
|
if vb < d8 && isInt { // Only use int for fewer than 8 value double-delta bytes.
|
|
c[doubleDeltaHeaderIsIntOffset] = 1
|
|
} else {
|
|
c[doubleDeltaHeaderIsIntOffset] = 0
|
|
}
|
|
return &c
|
|
}
|
|
|
|
// add implements chunk.
|
|
func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) {
|
|
// TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation.
|
|
if c.len() == 0 {
|
|
return c.addFirstSample(s), nil
|
|
}
|
|
|
|
tb := c.timeBytes()
|
|
vb := c.valueBytes()
|
|
|
|
if c.len() == 1 {
|
|
return c.addSecondSample(s, tb, vb)
|
|
}
|
|
|
|
remainingBytes := cap(c) - len(c)
|
|
sampleSize := c.sampleSize()
|
|
|
|
// Do we generally have space for another sample in this chunk? If not,
|
|
// overflow into a new one.
|
|
if remainingBytes < sampleSize {
|
|
return addToOverflowChunk(&c, s)
|
|
}
|
|
|
|
projectedTime := c.baseTime() + model.Time(c.len())*c.baseTimeDelta()
|
|
ddt := s.Timestamp - projectedTime
|
|
|
|
projectedValue := c.baseValue() + model.SampleValue(c.len())*c.baseValueDelta()
|
|
ddv := s.Value - projectedValue
|
|
|
|
ntb, nvb, nInt := tb, vb, c.isInt()
|
|
// If the new sample is incompatible with the current encoding, reencode the
|
|
// existing chunk data into new chunk(s).
|
|
if c.isInt() && !isInt64(ddv) {
|
|
// int->float.
|
|
nvb = d4
|
|
nInt = false
|
|
} else if !c.isInt() && vb == d4 && projectedValue+model.SampleValue(float32(ddv)) != s.Value {
|
|
// float32->float64.
|
|
nvb = d8
|
|
} else {
|
|
if tb < d8 {
|
|
// Maybe more bytes for timestamp.
|
|
ntb = max(tb, bytesNeededForSignedTimestampDelta(ddt))
|
|
}
|
|
if c.isInt() && vb < d8 {
|
|
// Maybe more bytes for sample value.
|
|
nvb = max(vb, bytesNeededForIntegerSampleValueDelta(ddv))
|
|
}
|
|
}
|
|
if tb != ntb || vb != nvb || c.isInt() != nInt {
|
|
if len(c)*2 < cap(c) {
|
|
return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
|
|
}
|
|
// Chunk is already half full. Better create a new one and save the transcoding efforts.
|
|
return addToOverflowChunk(&c, s)
|
|
}
|
|
|
|
offset := len(c)
|
|
c = c[:offset+sampleSize]
|
|
|
|
switch tb {
|
|
case d1:
|
|
c[offset] = byte(ddt)
|
|
case d2:
|
|
binary.LittleEndian.PutUint16(c[offset:], uint16(ddt))
|
|
case d4:
|
|
binary.LittleEndian.PutUint32(c[offset:], uint32(ddt))
|
|
case d8:
|
|
// Store the absolute value (no delta) in case of d8.
|
|
binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
|
|
default:
|
|
return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb)
|
|
}
|
|
|
|
offset += int(tb)
|
|
|
|
if c.isInt() {
|
|
switch vb {
|
|
case d0:
|
|
// No-op. Constant delta is stored as base value.
|
|
case d1:
|
|
c[offset] = byte(int8(ddv))
|
|
case d2:
|
|
binary.LittleEndian.PutUint16(c[offset:], uint16(int16(ddv)))
|
|
case d4:
|
|
binary.LittleEndian.PutUint32(c[offset:], uint32(int32(ddv)))
|
|
// d8 must not happen. Those samples are encoded as float64.
|
|
default:
|
|
return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb)
|
|
}
|
|
} else {
|
|
switch vb {
|
|
case d4:
|
|
binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(ddv)))
|
|
case d8:
|
|
// Store the absolute value (no delta) in case of d8.
|
|
binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
|
|
default:
|
|
return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb)
|
|
}
|
|
}
|
|
return []Chunk{&c}, nil
|
|
}
|
|
|
|
// clone implements chunk.
|
|
func (c doubleDeltaEncodedChunk) Clone() Chunk {
|
|
clone := make(doubleDeltaEncodedChunk, len(c), cap(c))
|
|
copy(clone, c)
|
|
return &clone
|
|
}
|
|
|
|
// FirstTime implements chunk.
|
|
func (c doubleDeltaEncodedChunk) FirstTime() model.Time {
|
|
return c.baseTime()
|
|
}
|
|
|
|
// NewIterator( implements chunk.
|
|
func (c *doubleDeltaEncodedChunk) NewIterator() Iterator {
|
|
return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{
|
|
c: *c,
|
|
baseT: c.baseTime(),
|
|
baseΔT: c.baseTimeDelta(),
|
|
baseV: c.baseValue(),
|
|
baseΔV: c.baseValueDelta(),
|
|
tBytes: c.timeBytes(),
|
|
vBytes: c.valueBytes(),
|
|
isInt: c.isInt(),
|
|
})
|
|
}
|
|
|
|
// marshal implements chunk.
|
|
func (c doubleDeltaEncodedChunk) Marshal(w io.Writer) error {
|
|
if len(c) > math.MaxUint16 {
|
|
panic("chunk buffer length would overflow a 16 bit uint")
|
|
}
|
|
binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c)))
|
|
|
|
n, err := w.Write(c[:cap(c)])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if n != cap(c) {
|
|
return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MarshalToBuf implements chunk.
|
|
func (c doubleDeltaEncodedChunk) MarshalToBuf(buf []byte) error {
|
|
if len(c) > math.MaxUint16 {
|
|
panic("chunk buffer length would overflow a 16 bit uint")
|
|
}
|
|
binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c)))
|
|
|
|
n := copy(buf, c)
|
|
if n != len(c) {
|
|
return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// unmarshal implements chunk.
|
|
func (c *doubleDeltaEncodedChunk) Unmarshal(r io.Reader) error {
|
|
*c = (*c)[:cap(*c)]
|
|
if _, err := io.ReadFull(r, *c); err != nil {
|
|
return err
|
|
}
|
|
l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])
|
|
if int(l) > cap(*c) {
|
|
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
|
|
}
|
|
if int(l) < doubleDeltaHeaderMinBytes {
|
|
return fmt.Errorf("chunk length less than header size: %d < %d", l, doubleDeltaHeaderMinBytes)
|
|
}
|
|
|
|
*c = (*c)[:l]
|
|
return nil
|
|
}
|
|
|
|
// unmarshalFromBuf implements chunk.
|
|
func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
|
|
*c = (*c)[:cap(*c)]
|
|
copy(*c, buf)
|
|
l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])
|
|
if int(l) > cap(*c) {
|
|
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
|
|
}
|
|
if int(l) < doubleDeltaHeaderMinBytes {
|
|
return fmt.Errorf("chunk length less than header size: %d < %d", l, doubleDeltaHeaderMinBytes)
|
|
}
|
|
*c = (*c)[:l]
|
|
return nil
|
|
}
|
|
|
|
// encoding implements chunk.
|
|
func (c doubleDeltaEncodedChunk) Encoding() Encoding { return DoubleDelta }
|
|
|
|
func (c doubleDeltaEncodedChunk) baseTime() model.Time {
|
|
return model.Time(
|
|
binary.LittleEndian.Uint64(
|
|
c[doubleDeltaHeaderBaseTimeOffset:],
|
|
),
|
|
)
|
|
}
|
|
|
|
func (c doubleDeltaEncodedChunk) baseValue() model.SampleValue {
|
|
return model.SampleValue(
|
|
math.Float64frombits(
|
|
binary.LittleEndian.Uint64(
|
|
c[doubleDeltaHeaderBaseValueOffset:],
|
|
),
|
|
),
|
|
)
|
|
}
|
|
|
|
func (c doubleDeltaEncodedChunk) baseTimeDelta() model.Time {
|
|
if len(c) < doubleDeltaHeaderBaseTimeDeltaOffset+8 {
|
|
return 0
|
|
}
|
|
return model.Time(
|
|
binary.LittleEndian.Uint64(
|
|
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
|
|
),
|
|
)
|
|
}
|
|
|
|
func (c doubleDeltaEncodedChunk) baseValueDelta() model.SampleValue {
|
|
if len(c) < doubleDeltaHeaderBaseValueDeltaOffset+8 {
|
|
return 0
|
|
}
|
|
return model.SampleValue(
|
|
math.Float64frombits(
|
|
binary.LittleEndian.Uint64(
|
|
c[doubleDeltaHeaderBaseValueDeltaOffset:],
|
|
),
|
|
),
|
|
)
|
|
}
|
|
|
|
func (c doubleDeltaEncodedChunk) timeBytes() deltaBytes {
|
|
return deltaBytes(c[doubleDeltaHeaderTimeBytesOffset])
|
|
}
|
|
|
|
func (c doubleDeltaEncodedChunk) valueBytes() deltaBytes {
|
|
return deltaBytes(c[doubleDeltaHeaderValueBytesOffset])
|
|
}
|
|
|
|
func (c doubleDeltaEncodedChunk) sampleSize() int {
|
|
return int(c.timeBytes() + c.valueBytes())
|
|
}
|
|
|
|
func (c doubleDeltaEncodedChunk) len() int {
|
|
if len(c) <= doubleDeltaHeaderIsIntOffset+1 {
|
|
return 0
|
|
}
|
|
if len(c) <= doubleDeltaHeaderBaseValueOffset+8 {
|
|
return 1
|
|
}
|
|
return (len(c)-doubleDeltaHeaderBytes)/c.sampleSize() + 2
|
|
}
|
|
|
|
func (c doubleDeltaEncodedChunk) isInt() bool {
|
|
return c[doubleDeltaHeaderIsIntOffset] == 1
|
|
}
|
|
|
|
// addFirstSample is a helper method only used by c.add(). It adds timestamp and
|
|
// value as base time and value.
|
|
func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []Chunk {
|
|
c = c[:doubleDeltaHeaderBaseValueOffset+8]
|
|
binary.LittleEndian.PutUint64(
|
|
c[doubleDeltaHeaderBaseTimeOffset:],
|
|
uint64(s.Timestamp),
|
|
)
|
|
binary.LittleEndian.PutUint64(
|
|
c[doubleDeltaHeaderBaseValueOffset:],
|
|
math.Float64bits(float64(s.Value)),
|
|
)
|
|
return []Chunk{&c}
|
|
}
|
|
|
|
// addSecondSample is a helper method only used by c.add(). It calculates the
|
|
// base delta from the provided sample and adds it to the chunk.
|
|
func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]Chunk, error) {
|
|
baseTimeDelta := s.Timestamp - c.baseTime()
|
|
if baseTimeDelta < 0 {
|
|
return nil, fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta)
|
|
}
|
|
c = c[:doubleDeltaHeaderBytes]
|
|
if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 {
|
|
// If already the base delta needs d8 (or we are at d8
|
|
// already, anyway), we better encode this timestamp
|
|
// directly rather than as a delta and switch everything
|
|
// to d8.
|
|
c[doubleDeltaHeaderTimeBytesOffset] = byte(d8)
|
|
binary.LittleEndian.PutUint64(
|
|
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
|
|
uint64(s.Timestamp),
|
|
)
|
|
} else {
|
|
binary.LittleEndian.PutUint64(
|
|
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
|
|
uint64(baseTimeDelta),
|
|
)
|
|
}
|
|
baseValue := c.baseValue()
|
|
baseValueDelta := s.Value - baseValue
|
|
if vb >= d8 || baseValue+baseValueDelta != s.Value {
|
|
// If we can't reproduce the original sample value (or
|
|
// if we are at d8 already, anyway), we better encode
|
|
// this value directly rather than as a delta and switch
|
|
// everything to d8.
|
|
c[doubleDeltaHeaderValueBytesOffset] = byte(d8)
|
|
c[doubleDeltaHeaderIsIntOffset] = 0
|
|
binary.LittleEndian.PutUint64(
|
|
c[doubleDeltaHeaderBaseValueDeltaOffset:],
|
|
math.Float64bits(float64(s.Value)),
|
|
)
|
|
} else {
|
|
binary.LittleEndian.PutUint64(
|
|
c[doubleDeltaHeaderBaseValueDeltaOffset:],
|
|
math.Float64bits(float64(baseValueDelta)),
|
|
)
|
|
}
|
|
return []Chunk{&c}, nil
|
|
}
|
|
|
|
// doubleDeltaEncodedIndexAccessor implements indexAccessor.
|
|
type doubleDeltaEncodedIndexAccessor struct {
|
|
c doubleDeltaEncodedChunk
|
|
baseT, baseΔT model.Time
|
|
baseV, baseΔV model.SampleValue
|
|
tBytes, vBytes deltaBytes
|
|
isInt bool
|
|
lastErr error
|
|
}
|
|
|
|
func (acc *doubleDeltaEncodedIndexAccessor) err() error {
|
|
return acc.lastErr
|
|
}
|
|
|
|
func (acc *doubleDeltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time {
|
|
if idx == 0 {
|
|
return acc.baseT
|
|
}
|
|
if idx == 1 {
|
|
// If time bytes are at d8, the time is saved directly rather
|
|
// than as a difference.
|
|
if acc.tBytes == d8 {
|
|
return acc.baseΔT
|
|
}
|
|
return acc.baseT + acc.baseΔT
|
|
}
|
|
|
|
offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes)
|
|
|
|
switch acc.tBytes {
|
|
case d1:
|
|
return acc.baseT +
|
|
model.Time(idx)*acc.baseΔT +
|
|
model.Time(int8(acc.c[offset]))
|
|
case d2:
|
|
return acc.baseT +
|
|
model.Time(idx)*acc.baseΔT +
|
|
model.Time(int16(binary.LittleEndian.Uint16(acc.c[offset:])))
|
|
case d4:
|
|
return acc.baseT +
|
|
model.Time(idx)*acc.baseΔT +
|
|
model.Time(int32(binary.LittleEndian.Uint32(acc.c[offset:])))
|
|
case d8:
|
|
// Take absolute value for d8.
|
|
return model.Time(binary.LittleEndian.Uint64(acc.c[offset:]))
|
|
default:
|
|
acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes)
|
|
return model.Earliest
|
|
}
|
|
}
|
|
|
|
func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue {
|
|
if idx == 0 {
|
|
return acc.baseV
|
|
}
|
|
if idx == 1 {
|
|
// If value bytes are at d8, the value is saved directly rather
|
|
// than as a difference.
|
|
if acc.vBytes == d8 {
|
|
return acc.baseΔV
|
|
}
|
|
return acc.baseV + acc.baseΔV
|
|
}
|
|
|
|
offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes) + int(acc.tBytes)
|
|
|
|
if acc.isInt {
|
|
switch acc.vBytes {
|
|
case d0:
|
|
return acc.baseV +
|
|
model.SampleValue(idx)*acc.baseΔV
|
|
case d1:
|
|
return acc.baseV +
|
|
model.SampleValue(idx)*acc.baseΔV +
|
|
model.SampleValue(int8(acc.c[offset]))
|
|
case d2:
|
|
return acc.baseV +
|
|
model.SampleValue(idx)*acc.baseΔV +
|
|
model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:])))
|
|
case d4:
|
|
return acc.baseV +
|
|
model.SampleValue(idx)*acc.baseΔV +
|
|
model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:])))
|
|
// No d8 for ints.
|
|
default:
|
|
acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes)
|
|
return 0
|
|
}
|
|
} else {
|
|
switch acc.vBytes {
|
|
case d4:
|
|
return acc.baseV +
|
|
model.SampleValue(idx)*acc.baseΔV +
|
|
model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:])))
|
|
case d8:
|
|
// Take absolute value for d8.
|
|
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:])))
|
|
default:
|
|
acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes)
|
|
return 0
|
|
}
|
|
}
|
|
}
|