mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1196 lines
38 KiB
1196 lines
38 KiB
// Copyright 2016 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package local
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
|
|
"github.com/prometheus/common/model"
|
|
)
|
|
|
|
// The varbit chunk encoding is broadly similar to the double-delta
|
|
// chunks. However, it uses a number of different bit-widths to save the
|
|
// double-deltas (rather than 1, 2, or 4 bytes). Also, it doesn't use the delta
|
|
// of the first two samples of a chunk as the base delta, but uses a "sliding"
|
|
// delta, i.e. the delta of the two previous samples. Both differences make
|
|
// random access more expensive. Sample values can be encoded with the same
|
|
// double-delta scheme as timestamps, but different value encodings can be
|
|
// chosen adaptively, among them XOR encoding and "zero" encoding for constant
|
|
// sample values. Overall, the varbit encoding results in a much better
|
|
// compression ratio (~1.3 bytes per sample compared to ~3.3 bytes per sample
|
|
// with double-delta encoding, for typical data sets).
|
|
//
|
|
// Major parts of the varbit encoding are inspired by the following paper:
|
|
// Gorilla: A Fast, Scalable, In-Memory Time Series Database
|
|
// T. Pelkonen et al., Facebook Inc.
|
|
// http://www.vldb.org/pvldb/vol8/p1816-teller.pdf
|
|
// Note that there are significant differences, some due to the way Prometheus
|
|
// chunks work, others to optimize for the Prometheus use-case.
|
|
//
|
|
// Layout of a 1024 byte varbit chunk (big endian, wherever it matters):
|
|
// - first time (int64): 8 bytes bit 0000-0063
|
|
// - first value (float64): 8 bytes bit 0064-0127
|
|
// - last time (int64): 8 bytes bit 0128-0191
|
|
// - last value (float64): 8 bytes bit 0192-0255
|
|
// - first Δt (t1-t0, unsigned): 3 bytes bit 0256-0279
|
|
// - flags (byte) 1 byte bit 0280-0287
|
|
// - bit offset for next sample 2 bytes bit 0288-0303
|
|
// - first Δv for value encoding 1, otherwise payload
|
|
// 4 bytes bit 0304-0335
|
|
// - payload 973 bytes bit 0336-8119
|
|
// The following only exists if the chunk is still open. Otherwise, it might be
|
|
// used by payload.
|
|
// - bit offset for current ΔΔt=0 count 2 bytes bit 8120-8135
|
|
// - last Δt 3 bytes bit 8136-8159
|
|
// - special bytes for value encoding 4 bytes bit 8160-8191
|
|
// - for encoding 1: last Δv 4 bytes bit 8160-8191
|
|
// - for encoding 2: count of
|
|
// - last leading zeros (1 byte) 1 byte bit 8160-8167
|
|
// - last significant bits (1 byte) 1 byte bit 8168-8175
|
|
//
|
|
// FLAGS
|
|
//
|
|
// The two least significant bits of the flags byte define the value encoding
|
|
// for the whole chunk, see below. The most significant byte of the flags byte
|
|
// is set if the chunk is closed. No samples can be added anymore to a closed
|
|
// chunk. Furthermore, the last value of a closed chunk is only saved in the
|
|
// header (last time, last value), while in a chunk that is still open, the last
|
|
// sample in the payload is the same sample as saved in the header.
|
|
//
|
|
// The remaining bits in the flags byte are currently unused.
|
|
//
|
|
// TIMESTAMP ENCODING
|
|
//
|
|
// The 1st timestamp is saved directly.
|
|
//
|
|
// The difference to the 2nd timestamp is saved as first Δt. 3 bytes is enough
|
|
// for about 4.5h. Since we close a chunk after sitting idle for 1h, this
|
|
// limitation has no practical consequences. Should, for whatever reason, a
|
|
// larger delta be required, the chunk would be closed, i.e. the new sample is
|
|
// added as the last sample to the chunk, and the next sample will be added to a
|
|
// new chunk.
|
|
//
|
|
// From the 3rd timestamp on, a double-delta (ΔΔt) is saved:
|
|
// (t_{n} - t_{n-1}) - (t_{n-2} - t_{n-1})
|
|
// To perform that operation, the last Δt is saved at the end of the chunk for
|
|
// as long the chunk is not closed yet (see above).
|
|
//
|
|
// Most of the times, ΔΔt is zero, even with the ms-precision of
|
|
// Prometheus. Therefore, we save a ΔΔt of zero as a leading '0' bit followed by
|
|
// 7 bits counting the number of consecutive ΔΔt==0 (the count is offset by -1,
|
|
// so the range of 0 to 127 represents 1 to 128 repetitions).
|
|
//
|
|
// If ΔΔt != 0, we essentially apply the Gorilla encoding scheme (cf. section
|
|
// 4.1.1 in the paper) but with different bit buckets as Prometheus uses ms
|
|
// rather than s, and the default scrape interval is 1m rather than 4m). In
|
|
// particular:
|
|
//
|
|
// - If ΔΔt is between [-32,31], store '10' followed by a 6 bit value. This is
|
|
// for minor irregularities in the scrape interval.
|
|
//
|
|
// - If ΔΔt is between [-65536,65535], store '110' followed by a 17 bit
|
|
// value. This will typically happen if a scrape is missed completely.
|
|
//
|
|
// - If ΔΔt is betwees [-4194304,4194303], store '111' followed by a 23 bit
|
|
// value. This spans more than 1h, which is usually enough as we close a
|
|
// chunk anyway if it doesn't receive any sample in 1h.
|
|
//
|
|
// - Should we nevertheless encounter a larger ΔΔt, we simply close the chunk,
|
|
// add the new sample as the last of the chunk, and add subsequent samples to
|
|
// a new chunk.
|
|
//
|
|
// VALUE ENCODING
|
|
//
|
|
// Value encoding can change and is determined by the two least significant bits
|
|
// of the 'flags' byte at bit position 280. The encoding can be changed without
|
|
// transcoding upon adding the 3rd sample. After that, an encoding change
|
|
// results either in transcoding or in closing the chunk.
|
|
//
|
|
// The 1st sample value is always saved directly. The 2nd sample value is saved
|
|
// in the header as the last value. Upon saving the 3rd value, an encoding is
|
|
// chosen, and the chunk is prepared accordingly.
|
|
//
|
|
// The following value encodings exist (with their value in the flags byte):
|
|
//
|
|
// 0: "Zero encoding".
|
|
//
|
|
// In many time series, the value simply stays constant over a long time
|
|
// (e.g. the "up" time series). In that case, all sample values are determined
|
|
// by the 1st value, and no further value encoding is happening at all. The
|
|
// payload consists entirely of timestamps.
|
|
//
|
|
// 1: Integer double-delta encoding.
|
|
//
|
|
// Many Prometheus metrics are integer counters and change in a quite regular
|
|
// fashion, similar to timestamps. Thus, the same double-delta encoding can be
|
|
// applied. This encoding works like the timestamp encoding described above, but
|
|
// with different bit buckets and without counting of repeated ΔΔv=0. The case
|
|
// of ΔΔv=0 is represented by a single '0' bit for each occurrence. The first Δv
|
|
// is saved as an int32 at bit position 288. The most recent Δv is saved as an
|
|
// int32 at the end of the chunk (see above). If Δv cannot be represented as a
|
|
// 32 bit signed integer, no integer double-delta encoding can be applied.
|
|
//
|
|
// Bit buckets (lead-in bytes followed by (signed) value bits):
|
|
// - '0': 0 bit
|
|
// - '10': 6 bit
|
|
// - '110': 13 bit
|
|
// - '1110': 20 bit
|
|
// - '1111': 33 bit
|
|
// Since Δv is restricted to 32 bit, 33 bit are always enough for ΔΔv.
|
|
//
|
|
// 2: XOR encoding.
|
|
//
|
|
// This follows almost precisely the Gorilla value encoding (cf. section 4.1.2
|
|
// of the paper). The last count of leading zeros and the last count of
|
|
// meaningful bits in the XOR value is saved at the end of the chunk for as long
|
|
// as the chunk is not closed yet (see above). Note, though, that the number of
|
|
// significant bits is saved as (count-1), i.e. a saved value of 0 means 1
|
|
// significant bit, a saved value of 1 means 2, and so on. Also, we save the
|
|
// numbers of leading zeros and significant bits anew if they drop a
|
|
// lot. Otherwise, you can easily be locked in with a high number of significant
|
|
// bits.
|
|
//
|
|
// 3: Direct encoding.
|
|
//
|
|
// If the sample values are just random, it is most efficient to save sample
|
|
// values directly as float64.
|
|
//
|
|
// ZIPPING TIMESTAMPS AND VALUES TOGETHER
|
|
//
|
|
// Usually, encoded timestamps and encoded values simply alternate. There are
|
|
// two exceptions:
|
|
//
|
|
// (1) With the "zero encoding" for values, the payload only contains
|
|
// timestamps.
|
|
//
|
|
// (2) In a consecutive row of up to 128 ΔΔt=0 repeats, the count of timestamps
|
|
// determines how many sample values will follow directly after another.
|
|
|
|
const (
|
|
varbitMinLength = 128
|
|
varbitMaxLength = 8191
|
|
|
|
// Useful byte offsets.
|
|
varbitFirstTimeOffset = 0
|
|
varbitFirstValueOffset = 8
|
|
varbitLastTimeOffset = 16
|
|
varbitLastValueOffset = 24
|
|
varbitFirstTimeDeltaOffset = 32
|
|
varbitFlagOffset = 35
|
|
varbitNextSampleBitOffsetOffset = 36
|
|
varbitFirstValueDeltaOffset = 38
|
|
// The following are in the "footer" and only usable if the chunk is
|
|
// still open.
|
|
varbitCountOffsetBitOffset = chunkLen - 9
|
|
varbitLastTimeDeltaOffset = chunkLen - 7
|
|
varbitLastValueDeltaOffset = chunkLen - 4
|
|
varbitLastLeadingZerosCountOffset = chunkLen - 4
|
|
varbitLastSignificantBitsCountOffset = chunkLen - 3
|
|
|
|
varbitFirstSampleBitOffset uint16 = 0 // Symbolic, don't really read or write here.
|
|
varbitSecondSampleBitOffset uint16 = 1 // Symbolic, don't really read or write here.
|
|
// varbitThirdSampleBitOffset is a bit special. Depending on the encoding, there can
|
|
// be various things at this offset. It's most of the time symbolic, but in the best
|
|
// case (zero encoding for values), it will be the real offset for the 3rd sample.
|
|
varbitThirdSampleBitOffset uint16 = varbitFirstValueDeltaOffset * 8
|
|
|
|
// If the bit offset for the next sample is above this threshold, no new
|
|
// samples can be added to the chunk's payload (because the payload has
|
|
// already reached the footer). However, one more sample can be saved in
|
|
// the header as the last sample.
|
|
varbitNextSampleBitOffsetThreshold = 8 * varbitCountOffsetBitOffset
|
|
|
|
varbitMaxTimeDelta = 1 << 24 // What fits into a 3-byte timestamp.
|
|
)
|
|
|
|
type varbitValueEncoding byte
|
|
|
|
const (
|
|
varbitZeroEncoding varbitValueEncoding = iota
|
|
varbitIntDoubleDeltaEncoding
|
|
varbitXOREncoding
|
|
varbitDirectEncoding
|
|
)
|
|
|
|
// varbitWorstCaseBitsPerSample provides the worst-case number of bits needed
|
|
// per sample with the various value encodings. The counts already include the
|
|
// up to 27 bits taken by a timestamp.
|
|
var varbitWorstCaseBitsPerSample = map[varbitValueEncoding]int{
|
|
varbitZeroEncoding: 27 + 0,
|
|
varbitIntDoubleDeltaEncoding: 27 + 38,
|
|
varbitXOREncoding: 27 + 13 + 64,
|
|
varbitDirectEncoding: 27 + 64,
|
|
}
|
|
|
|
// varbitChunk implements the chunk interface.
|
|
type varbitChunk []byte
|
|
|
|
// newVarbitChunk returns a newly allocated varbitChunk. For simplicity, all
|
|
// varbit chunks must have the length as determined by the chunkLen constant.
|
|
func newVarbitChunk(enc varbitValueEncoding) *varbitChunk {
|
|
if chunkLen < varbitMinLength || chunkLen > varbitMaxLength {
|
|
panic(fmt.Errorf(
|
|
"invalid chunk length of %d bytes, need at least %d bytes and at most %d bytes",
|
|
chunkLen, varbitMinLength, varbitMaxLength,
|
|
))
|
|
}
|
|
if enc > varbitDirectEncoding {
|
|
panic(fmt.Errorf("unknown varbit value encoding: %v", enc))
|
|
}
|
|
c := make(varbitChunk, chunkLen)
|
|
c.setValueEncoding(enc)
|
|
return &c
|
|
}
|
|
|
|
// add implements chunk.
|
|
func (c *varbitChunk) add(s model.SamplePair) ([]chunk, error) {
|
|
offset := c.nextSampleOffset()
|
|
switch {
|
|
case c.closed():
|
|
return addToOverflowChunk(c, s)
|
|
case offset > varbitNextSampleBitOffsetThreshold:
|
|
return c.addLastSample(s), nil
|
|
case offset == varbitFirstSampleBitOffset:
|
|
return c.addFirstSample(s), nil
|
|
case offset == varbitSecondSampleBitOffset:
|
|
return c.addSecondSample(s)
|
|
}
|
|
return c.addLaterSample(s, offset)
|
|
}
|
|
|
|
// clone implements chunk.
|
|
func (c varbitChunk) clone() chunk {
|
|
clone := make(varbitChunk, len(c))
|
|
copy(clone, c)
|
|
return &clone
|
|
}
|
|
|
|
// newIterator implements chunk.
|
|
func (c varbitChunk) newIterator() chunkIterator {
|
|
return newVarbitChunkIterator(c)
|
|
}
|
|
|
|
// marshal implements chunk.
|
|
func (c varbitChunk) marshal(w io.Writer) error {
|
|
n, err := w.Write(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if n != cap(c) {
|
|
return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// marshalToBuf implements chunk.
|
|
func (c varbitChunk) marshalToBuf(buf []byte) error {
|
|
n := copy(buf, c)
|
|
if n != len(c) {
|
|
return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// unmarshal implements chunk.
|
|
func (c varbitChunk) unmarshal(r io.Reader) error {
|
|
_, err := io.ReadFull(r, c)
|
|
return err
|
|
}
|
|
|
|
// unmarshalFromBuf implements chunk.
|
|
func (c varbitChunk) unmarshalFromBuf(buf []byte) error {
|
|
if copied := copy(c, buf); copied != cap(c) {
|
|
return fmt.Errorf("insufficient bytes copied from buffer during unmarshaling, want %d, got %d", cap(c), copied)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// encoding implements chunk.
|
|
func (c varbitChunk) encoding() chunkEncoding { return varbit }
|
|
|
|
// firstTime implements chunk.
|
|
func (c varbitChunk) firstTime() model.Time {
|
|
return model.Time(
|
|
binary.BigEndian.Uint64(
|
|
c[varbitFirstTimeOffset:],
|
|
),
|
|
)
|
|
}
|
|
|
|
func (c varbitChunk) firstValue() model.SampleValue {
|
|
return model.SampleValue(
|
|
math.Float64frombits(
|
|
binary.BigEndian.Uint64(
|
|
c[varbitFirstValueOffset:],
|
|
),
|
|
),
|
|
)
|
|
}
|
|
|
|
func (c varbitChunk) lastTime() model.Time {
|
|
return model.Time(
|
|
binary.BigEndian.Uint64(
|
|
c[varbitLastTimeOffset:],
|
|
),
|
|
)
|
|
}
|
|
|
|
func (c varbitChunk) lastValue() model.SampleValue {
|
|
return model.SampleValue(
|
|
math.Float64frombits(
|
|
binary.BigEndian.Uint64(
|
|
c[varbitLastValueOffset:],
|
|
),
|
|
),
|
|
)
|
|
}
|
|
|
|
func (c varbitChunk) firstTimeDelta() model.Time {
|
|
// Only the first 3 bytes are actually the timestamp, so get rid of the
|
|
// last one by bitshifting.
|
|
return model.Time(c[varbitFirstTimeDeltaOffset+2]) |
|
|
model.Time(c[varbitFirstTimeDeltaOffset+1])<<8 |
|
|
model.Time(c[varbitFirstTimeDeltaOffset])<<16
|
|
}
|
|
|
|
// firstValueDelta returns an undefined result if the encoding type is not 1.
|
|
func (c varbitChunk) firstValueDelta() int32 {
|
|
return int32(binary.BigEndian.Uint32(c[varbitFirstValueDeltaOffset:]))
|
|
}
|
|
|
|
// lastTimeDelta returns an undefined result if the chunk is closed already.
|
|
func (c varbitChunk) lastTimeDelta() model.Time {
|
|
return model.Time(c[varbitLastTimeDeltaOffset+2]) |
|
|
model.Time(c[varbitLastTimeDeltaOffset+1])<<8 |
|
|
model.Time(c[varbitLastTimeDeltaOffset])<<16
|
|
}
|
|
|
|
// setLastTimeDelta must not be called if the chunk is closed already. It most
|
|
// not be called with a time that doesn't fit into 24bit, either.
|
|
func (c varbitChunk) setLastTimeDelta(dT model.Time) {
|
|
if dT > varbitMaxTimeDelta {
|
|
panic("Δt overflows 24 bit")
|
|
}
|
|
c[varbitLastTimeDeltaOffset] = byte(dT >> 16)
|
|
c[varbitLastTimeDeltaOffset+1] = byte(dT >> 8)
|
|
c[varbitLastTimeDeltaOffset+2] = byte(dT)
|
|
}
|
|
|
|
// lastValueDelta returns an undefined result if the chunk is closed already.
|
|
func (c varbitChunk) lastValueDelta() int32 {
|
|
return int32(binary.BigEndian.Uint32(c[varbitLastValueDeltaOffset:]))
|
|
}
|
|
|
|
// setLastValueDelta must not be called if the chunk is closed already.
|
|
func (c varbitChunk) setLastValueDelta(dV int32) {
|
|
binary.BigEndian.PutUint32(c[varbitLastValueDeltaOffset:], uint32(dV))
|
|
}
|
|
|
|
func (c varbitChunk) nextSampleOffset() uint16 {
|
|
return binary.BigEndian.Uint16(c[varbitNextSampleBitOffsetOffset:])
|
|
}
|
|
|
|
func (c varbitChunk) setNextSampleOffset(offset uint16) {
|
|
binary.BigEndian.PutUint16(c[varbitNextSampleBitOffsetOffset:], offset)
|
|
}
|
|
|
|
func (c varbitChunk) valueEncoding() varbitValueEncoding {
|
|
return varbitValueEncoding(c[varbitFlagOffset] & 0x03)
|
|
}
|
|
|
|
func (c varbitChunk) setValueEncoding(enc varbitValueEncoding) {
|
|
if enc > varbitDirectEncoding {
|
|
panic("invalid varbit value encoding")
|
|
}
|
|
c[varbitFlagOffset] &^= 0x03 // Clear.
|
|
c[varbitFlagOffset] |= byte(enc) // Set.
|
|
}
|
|
|
|
func (c varbitChunk) closed() bool {
|
|
return c[varbitFlagOffset] > 0x7F // Most significant bit set.
|
|
}
|
|
|
|
func (c varbitChunk) zeroDDTRepeats() (repeats uint64, offset uint16) {
|
|
offset = binary.BigEndian.Uint16(c[varbitCountOffsetBitOffset:])
|
|
if offset == 0 {
|
|
return 0, 0
|
|
}
|
|
return c.readBitPattern(offset, 7) + 1, offset
|
|
}
|
|
|
|
func (c varbitChunk) setZeroDDTRepeats(repeats uint64, offset uint16) {
|
|
switch repeats {
|
|
case 0:
|
|
// Just clear the offset.
|
|
binary.BigEndian.PutUint16(c[varbitCountOffsetBitOffset:], 0)
|
|
return
|
|
case 1:
|
|
// First time we set a repeat here, so set the offset. But only
|
|
// if we haven't reached the footer yet. (If that's the case, we
|
|
// would overwrite ourselves below, and we don't need the offset
|
|
// later anyway because no more samples will be added to this
|
|
// chunk.)
|
|
if offset+7 <= varbitNextSampleBitOffsetThreshold {
|
|
binary.BigEndian.PutUint16(c[varbitCountOffsetBitOffset:], offset)
|
|
}
|
|
default:
|
|
// For a change, we are writing somewhere where we have written
|
|
// before. We need to clear the bits first.
|
|
posIn1stByte := offset % 8
|
|
c[offset/8] &^= bitMask[7][posIn1stByte]
|
|
if posIn1stByte > 1 {
|
|
c[offset/8+1] &^= bitMask[posIn1stByte-1][0]
|
|
}
|
|
}
|
|
c.addBitPattern(offset, repeats-1, 7)
|
|
}
|
|
|
|
func (c varbitChunk) setLastSample(s model.SamplePair) {
|
|
binary.BigEndian.PutUint64(
|
|
c[varbitLastTimeOffset:],
|
|
uint64(s.Timestamp),
|
|
)
|
|
binary.BigEndian.PutUint64(
|
|
c[varbitLastValueOffset:],
|
|
math.Float64bits(float64(s.Value)),
|
|
)
|
|
}
|
|
|
|
// addFirstSample is a helper method only used by c.add(). It adds timestamp and
|
|
// value as base time and value.
|
|
func (c *varbitChunk) addFirstSample(s model.SamplePair) []chunk {
|
|
binary.BigEndian.PutUint64(
|
|
(*c)[varbitFirstTimeOffset:],
|
|
uint64(s.Timestamp),
|
|
)
|
|
binary.BigEndian.PutUint64(
|
|
(*c)[varbitFirstValueOffset:],
|
|
math.Float64bits(float64(s.Value)),
|
|
)
|
|
c.setLastSample(s) // To simplify handling of single-sample chunks.
|
|
c.setNextSampleOffset(varbitSecondSampleBitOffset)
|
|
return []chunk{c}
|
|
}
|
|
|
|
// addSecondSample is a helper method only used by c.add(). It calculates the
|
|
// first time delta from the provided sample and adds it to the chunk together
|
|
// with the provided sample as the last sample.
|
|
func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]chunk, error) {
|
|
firstTimeDelta := s.Timestamp - c.firstTime()
|
|
if firstTimeDelta < 0 {
|
|
return nil, fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta)
|
|
}
|
|
if firstTimeDelta > varbitMaxTimeDelta {
|
|
// A time delta too great. Still, we can add it as a last sample
|
|
// before overflowing.
|
|
return c.addLastSample(s), nil
|
|
}
|
|
(*c)[varbitFirstTimeDeltaOffset] = byte(firstTimeDelta >> 16)
|
|
(*c)[varbitFirstTimeDeltaOffset+1] = byte(firstTimeDelta >> 8)
|
|
(*c)[varbitFirstTimeDeltaOffset+2] = byte(firstTimeDelta)
|
|
|
|
// Also set firstTimeDelta as the last time delta to be able to use the
|
|
// normal methods for adding later samples.
|
|
c.setLastTimeDelta(firstTimeDelta)
|
|
|
|
c.setLastSample(s)
|
|
c.setNextSampleOffset(varbitThirdSampleBitOffset)
|
|
return []chunk{c}, nil
|
|
}
|
|
|
|
// addLastSample isa a helper method only used by c.add() and in other helper
|
|
// methods called by c.add(). It simply sets the given sample as the last sample
|
|
// in the heador and declares the chunk closed. In other words, addLastSample
|
|
// adds the very last sample added to this chunk ever, while setLastSample sets
|
|
// the sample most recently added to the chunk so that it can be used for the
|
|
// calculations required to add the next sample.
|
|
func (c *varbitChunk) addLastSample(s model.SamplePair) []chunk {
|
|
c.setLastSample(s)
|
|
(*c)[varbitFlagOffset] |= 0x80
|
|
return []chunk{c}
|
|
}
|
|
|
|
// addLaterSample is a helper method only used by c.add(). It adds a third or
|
|
// later sample.
|
|
func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]chunk, error) {
|
|
var (
|
|
lastTime = c.lastTime()
|
|
lastTimeDelta = c.lastTimeDelta()
|
|
newTimeDelta = s.Timestamp - lastTime
|
|
lastValue = c.lastValue()
|
|
encoding = c.valueEncoding()
|
|
)
|
|
|
|
if newTimeDelta < 0 {
|
|
return nil, fmt.Errorf("Δt is less than zero: %v", newTimeDelta)
|
|
}
|
|
if offset == varbitThirdSampleBitOffset {
|
|
offset, encoding = c.prepForThirdSample(lastValue, s.Value, encoding)
|
|
}
|
|
if newTimeDelta > varbitMaxTimeDelta {
|
|
// A time delta too great. Still, we can add it as a last sample
|
|
// before overflowing.
|
|
return c.addLastSample(s), nil
|
|
}
|
|
|
|
// Analyze worst case, does it fit? If not, set new sample as the last.
|
|
if int(offset)+varbitWorstCaseBitsPerSample[encoding] > chunkLen*8 {
|
|
return c.addLastSample(s), nil
|
|
}
|
|
|
|
// Transcoding/overflow decisions first.
|
|
if encoding == varbitZeroEncoding && s.Value != lastValue {
|
|
// Cannot go on with zero encoding.
|
|
if offset > chunkLen*4 {
|
|
// Chunk already half full. Don't transcode, overflow instead.
|
|
return addToOverflowChunk(c, s)
|
|
}
|
|
if isInt32(s.Value - lastValue) {
|
|
// Trying int encoding looks promising.
|
|
return transcodeAndAdd(newVarbitChunk(varbitIntDoubleDeltaEncoding), c, s)
|
|
}
|
|
return transcodeAndAdd(newVarbitChunk(varbitXOREncoding), c, s)
|
|
}
|
|
if encoding == varbitIntDoubleDeltaEncoding && !isInt32(s.Value-lastValue) {
|
|
// Cannot go on with int encoding.
|
|
if offset > chunkLen*4 {
|
|
// Chunk already half full. Don't transcode, overflow instead.
|
|
return addToOverflowChunk(c, s)
|
|
}
|
|
return transcodeAndAdd(newVarbitChunk(varbitXOREncoding), c, s)
|
|
}
|
|
|
|
offset, overflow := c.addDDTime(offset, lastTimeDelta, newTimeDelta)
|
|
if overflow {
|
|
return c.addLastSample(s), nil
|
|
}
|
|
switch encoding {
|
|
case varbitZeroEncoding:
|
|
// Nothing to do.
|
|
case varbitIntDoubleDeltaEncoding:
|
|
offset = c.addDDValue(offset, lastValue, s.Value)
|
|
case varbitXOREncoding:
|
|
offset = c.addXORValue(offset, lastValue, s.Value)
|
|
case varbitDirectEncoding:
|
|
offset = c.addBitPattern(offset, math.Float64bits(float64(s.Value)), 64)
|
|
default:
|
|
return nil, fmt.Errorf("unknown Varbit value encoding: %v", encoding)
|
|
}
|
|
|
|
c.setNextSampleOffset(offset)
|
|
c.setLastSample(s)
|
|
return []chunk{c}, nil
|
|
}
|
|
|
|
func (c varbitChunk) prepForThirdSample(
|
|
lastValue, newValue model.SampleValue, encoding varbitValueEncoding,
|
|
) (uint16, varbitValueEncoding) {
|
|
var (
|
|
offset = varbitThirdSampleBitOffset
|
|
firstValue = c.firstValue()
|
|
firstValueDelta = lastValue - firstValue
|
|
firstXOR = math.Float64bits(float64(firstValue)) ^ math.Float64bits(float64(lastValue))
|
|
_, firstSignificantBits = countBits(firstXOR)
|
|
secondXOR = math.Float64bits(float64(lastValue)) ^ math.Float64bits(float64(newValue))
|
|
_, secondSignificantBits = countBits(secondXOR)
|
|
)
|
|
// Now pick an initial encoding and prepare things accordingly.
|
|
// However, never pick an encoding "below" the one initially set.
|
|
switch {
|
|
case encoding == varbitZeroEncoding && lastValue == firstValue && lastValue == newValue:
|
|
// Stay at zero encoding.
|
|
// No value to be set.
|
|
// No offset change required.
|
|
case encoding <= varbitIntDoubleDeltaEncoding && isInt32(firstValueDelta):
|
|
encoding = varbitIntDoubleDeltaEncoding
|
|
binary.BigEndian.PutUint32(
|
|
c[varbitFirstValueDeltaOffset:],
|
|
uint32(int32(firstValueDelta)),
|
|
)
|
|
c.setLastValueDelta(int32(firstValueDelta))
|
|
offset += 32
|
|
case encoding == varbitDirectEncoding || firstSignificantBits+secondSignificantBits > 100:
|
|
// Heuristics based on three samples only is a bit weak,
|
|
// but if we need 50+13 = 63 bits per sample already
|
|
// now, we might be better off going for direct encoding.
|
|
encoding = varbitDirectEncoding
|
|
// Put bit pattern directly where otherwise the delta would have gone.
|
|
binary.BigEndian.PutUint64(
|
|
c[varbitFirstValueDeltaOffset:],
|
|
math.Float64bits(float64(lastValue)),
|
|
)
|
|
offset += 64
|
|
default:
|
|
encoding = varbitXOREncoding
|
|
offset = c.addXORValue(offset, firstValue, lastValue)
|
|
}
|
|
c.setValueEncoding(encoding)
|
|
c.setNextSampleOffset(offset)
|
|
return offset, encoding
|
|
}
|
|
|
|
// addDDTime requires that lastTimeDelta and newTimeDelta are positive and don't overflow 24bit.
|
|
func (c varbitChunk) addDDTime(offset uint16, lastTimeDelta, newTimeDelta model.Time) (newOffset uint16, overflow bool) {
|
|
timeDD := newTimeDelta - lastTimeDelta
|
|
|
|
if !isSignedIntN(int64(timeDD), 23) {
|
|
return offset, true
|
|
}
|
|
|
|
c.setLastTimeDelta(newTimeDelta)
|
|
repeats, repeatsOffset := c.zeroDDTRepeats()
|
|
|
|
if timeDD == 0 {
|
|
if repeats == 0 || repeats == 128 {
|
|
// First zeroDDT, or counter full, prepare new counter.
|
|
offset = c.addZeroBit(offset)
|
|
repeatsOffset = offset
|
|
offset += 7
|
|
repeats = 0
|
|
}
|
|
c.setZeroDDTRepeats(repeats+1, repeatsOffset)
|
|
return offset, false
|
|
}
|
|
|
|
// No zero repeat. If we had any before, clear the DDT offset.
|
|
c.setZeroDDTRepeats(0, repeatsOffset)
|
|
|
|
switch {
|
|
case isSignedIntN(int64(timeDD), 6):
|
|
offset = c.addOneBitsWithTrailingZero(offset, 1)
|
|
offset = c.addSignedInt(offset, int64(timeDD), 6)
|
|
case isSignedIntN(int64(timeDD), 17):
|
|
offset = c.addOneBitsWithTrailingZero(offset, 2)
|
|
offset = c.addSignedInt(offset, int64(timeDD), 17)
|
|
case isSignedIntN(int64(timeDD), 23):
|
|
offset = c.addOneBits(offset, 3)
|
|
offset = c.addSignedInt(offset, int64(timeDD), 23)
|
|
default:
|
|
panic("unexpected required bits for ΔΔt")
|
|
}
|
|
return offset, false
|
|
}
|
|
|
|
// addDDValue requires that newValue-lastValue can be represented with an int32.
|
|
func (c varbitChunk) addDDValue(offset uint16, lastValue, newValue model.SampleValue) uint16 {
|
|
newValueDelta := int64(newValue - lastValue)
|
|
lastValueDelta := c.lastValueDelta()
|
|
valueDD := newValueDelta - int64(lastValueDelta)
|
|
c.setLastValueDelta(int32(newValueDelta))
|
|
|
|
switch {
|
|
case valueDD == 0:
|
|
return c.addZeroBit(offset)
|
|
case isSignedIntN(valueDD, 6):
|
|
offset = c.addOneBitsWithTrailingZero(offset, 1)
|
|
return c.addSignedInt(offset, valueDD, 6)
|
|
case isSignedIntN(valueDD, 13):
|
|
offset = c.addOneBitsWithTrailingZero(offset, 2)
|
|
return c.addSignedInt(offset, valueDD, 13)
|
|
case isSignedIntN(valueDD, 20):
|
|
offset = c.addOneBitsWithTrailingZero(offset, 3)
|
|
return c.addSignedInt(offset, valueDD, 20)
|
|
case isSignedIntN(valueDD, 33):
|
|
offset = c.addOneBits(offset, 4)
|
|
return c.addSignedInt(offset, valueDD, 33)
|
|
default:
|
|
panic("unexpected required bits for ΔΔv")
|
|
}
|
|
}
|
|
|
|
func (c varbitChunk) addXORValue(offset uint16, lastValue, newValue model.SampleValue) uint16 {
|
|
lastPattern := math.Float64bits(float64(lastValue))
|
|
newPattern := math.Float64bits(float64(newValue))
|
|
xor := lastPattern ^ newPattern
|
|
if xor == 0 {
|
|
return c.addZeroBit(offset)
|
|
}
|
|
|
|
lastLeadingBits := c[varbitLastLeadingZerosCountOffset]
|
|
lastSignificantBits := c[varbitLastSignificantBitsCountOffset]
|
|
newLeadingBits, newSignificantBits := countBits(xor)
|
|
|
|
// Short entry if the new significant bits fit into the same box as the
|
|
// last significant bits. However, should the new significant bits be
|
|
// shorter by 10 or more, go for a long entry instead, as we will
|
|
// probably save more (11 bit one-time overhead, potentially more to
|
|
// save later).
|
|
if newLeadingBits >= lastLeadingBits &&
|
|
newLeadingBits+newSignificantBits <= lastLeadingBits+lastSignificantBits &&
|
|
lastSignificantBits-newSignificantBits < 10 {
|
|
offset = c.addOneBitsWithTrailingZero(offset, 1)
|
|
return c.addBitPattern(
|
|
offset,
|
|
xor>>(64-lastLeadingBits-lastSignificantBits),
|
|
uint16(lastSignificantBits),
|
|
)
|
|
}
|
|
|
|
// Long entry.
|
|
c[varbitLastLeadingZerosCountOffset] = newLeadingBits
|
|
c[varbitLastSignificantBitsCountOffset] = newSignificantBits
|
|
offset = c.addOneBits(offset, 2)
|
|
offset = c.addBitPattern(offset, uint64(newLeadingBits), 5)
|
|
offset = c.addBitPattern(offset, uint64(newSignificantBits-1), 6) // Note -1!
|
|
return c.addBitPattern(
|
|
offset,
|
|
xor>>(64-newLeadingBits-newSignificantBits),
|
|
uint16(newSignificantBits),
|
|
)
|
|
}
|
|
|
|
func (c varbitChunk) addZeroBit(offset uint16) uint16 {
|
|
if offset < varbitNextSampleBitOffsetThreshold {
|
|
// Writing a zero to a never touched area is a no-op.
|
|
// Just increase the offset.
|
|
return offset + 1
|
|
}
|
|
newByte := c[offset/8] &^ bitMask[1][offset%8]
|
|
c[offset/8] = newByte
|
|
// TODO(beorn7): The two lines above could be written as
|
|
// c[offset/8] &^= bitMask[1][offset%8]
|
|
// However, that tickles a compiler bug with GOARCH=386.
|
|
// See https://github.com/prometheus/prometheus/issues/1509
|
|
return offset + 1
|
|
}
|
|
|
|
func (c varbitChunk) addOneBits(offset uint16, n uint16) uint16 {
|
|
if n > 7 {
|
|
panic("unexpected number of control bits")
|
|
}
|
|
b := 8 - offset%8
|
|
if b > n {
|
|
b = n
|
|
}
|
|
c[offset/8] |= bitMask[b][offset%8]
|
|
offset += b
|
|
b = n - b
|
|
if b > 0 {
|
|
c[offset/8] |= bitMask[b][0]
|
|
offset += b
|
|
}
|
|
return offset
|
|
}
|
|
func (c varbitChunk) addOneBitsWithTrailingZero(offset uint16, n uint16) uint16 {
|
|
offset = c.addOneBits(offset, n)
|
|
return c.addZeroBit(offset)
|
|
}
|
|
|
|
// addSignedInt adds i as a signed integer with n bits. It requires i to be
|
|
// representable as such. (Check with isSignedIntN first.)
|
|
func (c varbitChunk) addSignedInt(offset uint16, i int64, n uint16) uint16 {
|
|
if i < 0 && n < 64 {
|
|
i += 1 << n
|
|
}
|
|
return c.addBitPattern(offset, uint64(i), n)
|
|
}
|
|
|
|
// addBitPattern adds the last n bits of the given pattern. Other bits in the
|
|
// pattern must be 0.
|
|
func (c varbitChunk) addBitPattern(offset uint16, pattern uint64, n uint16) uint16 {
|
|
var (
|
|
byteOffset = offset / 8
|
|
bitsToWrite = 8 - offset%8
|
|
newOffset = offset + n
|
|
)
|
|
|
|
// Clean up the parts of the footer we will write into. (But not more as
|
|
// we are still using the value related part of the footer when we have
|
|
// already overwritten timestamp related parts.)
|
|
if newOffset > varbitNextSampleBitOffsetThreshold {
|
|
pos := offset
|
|
if pos < varbitNextSampleBitOffsetThreshold {
|
|
pos = varbitNextSampleBitOffsetThreshold
|
|
}
|
|
for pos < newOffset {
|
|
posInByte := pos % 8
|
|
bitsToClear := newOffset - pos
|
|
if bitsToClear > 8-posInByte {
|
|
bitsToClear = 8 - posInByte
|
|
}
|
|
c[pos/8] &^= bitMask[bitsToClear][posInByte]
|
|
pos += bitsToClear
|
|
}
|
|
}
|
|
|
|
for n > 0 {
|
|
if n <= bitsToWrite {
|
|
c[byteOffset] |= byte(pattern << (bitsToWrite - n))
|
|
break
|
|
}
|
|
c[byteOffset] |= byte(pattern >> (n - bitsToWrite))
|
|
n -= bitsToWrite
|
|
bitsToWrite = 8
|
|
byteOffset++
|
|
}
|
|
return newOffset
|
|
}
|
|
|
|
// readBitPattern reads n bits at the given offset and returns them as the last
|
|
// n bits in a uint64.
|
|
func (c varbitChunk) readBitPattern(offset, n uint16) uint64 {
|
|
var (
|
|
result uint64
|
|
byteOffset = offset / 8
|
|
bitOffset = offset % 8
|
|
trailingBits, bitsToRead uint16
|
|
)
|
|
|
|
for n > 0 {
|
|
trailingBits = 0
|
|
bitsToRead = 8 - bitOffset
|
|
if bitsToRead > n {
|
|
trailingBits = bitsToRead - n
|
|
bitsToRead = n
|
|
}
|
|
result <<= bitsToRead
|
|
result |= uint64(
|
|
(c[byteOffset] & bitMask[bitsToRead][bitOffset]) >> trailingBits,
|
|
)
|
|
n -= bitsToRead
|
|
byteOffset++
|
|
bitOffset = 0
|
|
}
|
|
return result
|
|
}
|
|
|
|
type varbitChunkIterator struct {
|
|
c varbitChunk
|
|
// pos is the bit position within the chunk for the next sample to be
|
|
// decoded when scan() is called (i.e. it is _not_ the bit position of
|
|
// the sample currently returned by value()). The symbolic values
|
|
// varbitFirstSampleBitOffset and varbitSecondSampleBitOffset are also
|
|
// used for pos. len is the offset of the first bit in the chunk that is
|
|
// not part of the payload. If pos==len, then the iterator is positioned
|
|
// behind the last sample in the payload. However, the next call of
|
|
// scan() still has to check if the chunk is closed, in which case there
|
|
// is one more sample, saved in the header. To mark the iterator as
|
|
// having scanned that last sample, too, pos is set to len+1.
|
|
pos, len uint16
|
|
t, dT model.Time
|
|
repeats byte // Repeats of ΔΔt=0.
|
|
v model.SampleValue
|
|
dV int64 // Only used for int value encoding.
|
|
leading, significant uint16
|
|
enc varbitValueEncoding
|
|
lastError error
|
|
rewound bool
|
|
nextT model.Time // Only for rewound state.
|
|
nextV model.SampleValue // Only for rewound state.
|
|
}
|
|
|
|
func newVarbitChunkIterator(c varbitChunk) *varbitChunkIterator {
|
|
return &varbitChunkIterator{
|
|
c: c,
|
|
len: c.nextSampleOffset(),
|
|
t: model.Earliest,
|
|
enc: c.valueEncoding(),
|
|
significant: 1,
|
|
}
|
|
}
|
|
|
|
// lastTimestamp implements chunkIterator.
|
|
func (it *varbitChunkIterator) lastTimestamp() (model.Time, error) {
|
|
if it.len == varbitFirstSampleBitOffset {
|
|
// No samples in the chunk yet.
|
|
return model.Earliest, it.lastError
|
|
}
|
|
return it.c.lastTime(), it.lastError
|
|
}
|
|
|
|
// contains implements chunkIterator.
|
|
func (it *varbitChunkIterator) contains(t model.Time) (bool, error) {
|
|
last, err := it.lastTimestamp()
|
|
if err != nil {
|
|
it.lastError = err
|
|
return false, err
|
|
}
|
|
return !t.Before(it.c.firstTime()) &&
|
|
!t.After(last), it.lastError
|
|
}
|
|
|
|
// scan implements chunkIterator.
|
|
func (it *varbitChunkIterator) scan() bool {
|
|
if it.lastError != nil {
|
|
return false
|
|
}
|
|
if it.rewound {
|
|
it.t = it.nextT
|
|
it.v = it.nextV
|
|
it.rewound = false
|
|
return true
|
|
}
|
|
if it.pos > it.len {
|
|
return false
|
|
}
|
|
if it.pos == it.len && it.repeats == 0 {
|
|
it.pos = it.len + 1
|
|
if !it.c.closed() {
|
|
return false
|
|
}
|
|
it.t = it.c.lastTime()
|
|
it.v = it.c.lastValue()
|
|
return it.lastError == nil
|
|
}
|
|
if it.pos == varbitFirstSampleBitOffset {
|
|
it.t = it.c.firstTime()
|
|
it.v = it.c.firstValue()
|
|
it.pos = varbitSecondSampleBitOffset
|
|
return it.lastError == nil
|
|
}
|
|
if it.pos == varbitSecondSampleBitOffset {
|
|
if it.len == varbitThirdSampleBitOffset && !it.c.closed() {
|
|
// Special case: Chunk has only two samples.
|
|
it.t = it.c.lastTime()
|
|
it.v = it.c.lastValue()
|
|
it.pos = it.len + 1
|
|
return it.lastError == nil
|
|
}
|
|
it.dT = it.c.firstTimeDelta()
|
|
it.t += it.dT
|
|
// Value depends on encoding.
|
|
switch it.enc {
|
|
case varbitZeroEncoding:
|
|
it.pos = varbitThirdSampleBitOffset
|
|
case varbitIntDoubleDeltaEncoding:
|
|
it.dV = int64(it.c.firstValueDelta())
|
|
it.v += model.SampleValue(it.dV)
|
|
it.pos = varbitThirdSampleBitOffset + 32
|
|
case varbitXOREncoding:
|
|
it.pos = varbitThirdSampleBitOffset
|
|
it.readXOR()
|
|
case varbitDirectEncoding:
|
|
it.v = model.SampleValue(math.Float64frombits(
|
|
binary.BigEndian.Uint64(it.c[varbitThirdSampleBitOffset/8:]),
|
|
))
|
|
it.pos = varbitThirdSampleBitOffset + 64
|
|
default:
|
|
it.lastError = fmt.Errorf("unknown varbit value encoding: %v", it.enc)
|
|
}
|
|
return it.lastError == nil
|
|
}
|
|
// 3rd sample or later does not have special cases anymore.
|
|
it.readDDT()
|
|
switch it.enc {
|
|
case varbitZeroEncoding:
|
|
// Do nothing.
|
|
case varbitIntDoubleDeltaEncoding:
|
|
it.readDDV()
|
|
case varbitXOREncoding:
|
|
it.readXOR()
|
|
case varbitDirectEncoding:
|
|
it.v = model.SampleValue(math.Float64frombits(it.readBitPattern(64)))
|
|
return it.lastError == nil
|
|
default:
|
|
it.lastError = fmt.Errorf("unknown varbit value encoding: %v", it.enc)
|
|
return false
|
|
}
|
|
return it.lastError == nil
|
|
}
|
|
|
|
// findAtOrBefore implements chunkIterator.
|
|
func (it *varbitChunkIterator) findAtOrBefore(t model.Time) bool {
|
|
if it.len == 0 || t.Before(it.c.firstTime()) {
|
|
return false
|
|
}
|
|
last := it.c.lastTime()
|
|
if !t.Before(last) {
|
|
it.t = last
|
|
it.v = it.c.lastValue()
|
|
it.pos = it.len + 1
|
|
return true
|
|
}
|
|
if t == it.t {
|
|
return it.lastError == nil
|
|
}
|
|
if t.Before(it.t) || it.rewound {
|
|
it.reset()
|
|
}
|
|
|
|
var (
|
|
prevT = model.Earliest
|
|
prevV model.SampleValue
|
|
)
|
|
for it.scan() && t.After(it.t) {
|
|
prevT = it.t
|
|
prevV = it.v
|
|
// TODO(beorn7): If we are in a repeat, we could iterate forward
|
|
// much faster.
|
|
}
|
|
if t == it.t {
|
|
return it.lastError == nil
|
|
}
|
|
it.rewind(prevT, prevV)
|
|
return it.lastError == nil
|
|
}
|
|
|
|
// findAtOrAfter implements chunkIterator.
|
|
func (it *varbitChunkIterator) findAtOrAfter(t model.Time) bool {
|
|
if it.len == 0 || t.After(it.c.lastTime()) {
|
|
return false
|
|
}
|
|
first := it.c.firstTime()
|
|
if !t.After(first) {
|
|
it.reset()
|
|
return it.scan()
|
|
}
|
|
if t == it.t {
|
|
return it.lastError == nil
|
|
}
|
|
if t.Before(it.t) {
|
|
it.reset()
|
|
}
|
|
for it.scan() && t.After(it.t) {
|
|
// TODO(beorn7): If we are in a repeat, we could iterate forward
|
|
// much faster.
|
|
}
|
|
return it.lastError == nil
|
|
}
|
|
|
|
// value implements chunkIterator.
|
|
func (it *varbitChunkIterator) value() model.SamplePair {
|
|
return model.SamplePair{
|
|
Timestamp: it.t,
|
|
Value: it.v,
|
|
}
|
|
}
|
|
|
|
// err implements chunkIterator.
|
|
func (it *varbitChunkIterator) err() error {
|
|
return it.lastError
|
|
}
|
|
|
|
func (it *varbitChunkIterator) readDDT() {
|
|
if it.repeats > 0 {
|
|
it.repeats--
|
|
} else {
|
|
switch it.readControlBits(3) {
|
|
case 0:
|
|
it.repeats = byte(it.readBitPattern(7))
|
|
case 1:
|
|
it.dT += model.Time(it.readSignedInt(6))
|
|
case 2:
|
|
it.dT += model.Time(it.readSignedInt(17))
|
|
case 3:
|
|
it.dT += model.Time(it.readSignedInt(23))
|
|
default:
|
|
panic("unexpected number of control bits")
|
|
}
|
|
}
|
|
it.t += it.dT
|
|
}
|
|
|
|
func (it *varbitChunkIterator) readDDV() {
|
|
switch it.readControlBits(4) {
|
|
case 0:
|
|
// Do nothing.
|
|
case 1:
|
|
it.dV += it.readSignedInt(6)
|
|
case 2:
|
|
it.dV += it.readSignedInt(13)
|
|
case 3:
|
|
it.dV += it.readSignedInt(20)
|
|
case 4:
|
|
it.dV += it.readSignedInt(33)
|
|
default:
|
|
panic("unexpected number of control bits")
|
|
}
|
|
it.v += model.SampleValue(it.dV)
|
|
}
|
|
|
|
func (it *varbitChunkIterator) readXOR() {
|
|
switch it.readControlBits(2) {
|
|
case 0:
|
|
return
|
|
case 1:
|
|
// Do nothing right now. All done below.
|
|
case 2:
|
|
it.leading = uint16(it.readBitPattern(5))
|
|
it.significant = uint16(it.readBitPattern(6)) + 1
|
|
default:
|
|
panic("unexpected number of control bits")
|
|
}
|
|
pattern := math.Float64bits(float64(it.v))
|
|
pattern ^= it.readBitPattern(it.significant) << (64 - it.significant - it.leading)
|
|
it.v = model.SampleValue(math.Float64frombits(pattern))
|
|
}
|
|
|
|
// readControlBits reads successive 1-bits and stops after reading the first
|
|
// 0-bit. It also stops once it has read max bits. It returns the number of read
|
|
// 1-bits.
|
|
func (it *varbitChunkIterator) readControlBits(max uint16) uint16 {
|
|
var count uint16
|
|
for count < max && int(it.pos/8) < len(it.c) {
|
|
b := it.c[it.pos/8] & bitMask[1][it.pos%8]
|
|
it.pos++
|
|
if b == 0 {
|
|
return count
|
|
}
|
|
count++
|
|
}
|
|
if int(it.pos/8) >= len(it.c) {
|
|
it.lastError = errChunkBoundsExceeded
|
|
}
|
|
return count
|
|
}
|
|
|
|
func (it *varbitChunkIterator) readBitPattern(n uint16) uint64 {
|
|
if len(it.c)*8 < int(it.pos)+int(n) {
|
|
it.lastError = errChunkBoundsExceeded
|
|
return 0
|
|
}
|
|
u := it.c.readBitPattern(it.pos, n)
|
|
it.pos += n
|
|
return u
|
|
}
|
|
|
|
func (it *varbitChunkIterator) readSignedInt(n uint16) int64 {
|
|
u := it.readBitPattern(n)
|
|
if n < 64 && u >= 1<<(n-1) {
|
|
u -= 1 << n
|
|
}
|
|
return int64(u)
|
|
}
|
|
|
|
// reset puts the chunk iterator into the state it had upon creation.
|
|
func (it *varbitChunkIterator) reset() {
|
|
it.pos = 0
|
|
it.t = model.Earliest
|
|
it.dT = 0
|
|
it.repeats = 0
|
|
it.v = 0
|
|
it.dV = 0
|
|
it.leading = 0
|
|
it.significant = 1
|
|
it.rewound = false
|
|
}
|
|
|
|
// rewind "rewinds" the chunk iterator by one step. Since one cannot simply
|
|
// rewind a Varbit chunk, the old values have to be provided by the
|
|
// caller. Rewinding an already rewound chunk panics. After a call of scan or
|
|
// reset, a chunk can be rewound again.
|
|
func (it *varbitChunkIterator) rewind(t model.Time, v model.SampleValue) {
|
|
if it.rewound {
|
|
panic("cannot rewind varbit chunk twice")
|
|
}
|
|
it.rewound = true
|
|
it.nextT = it.t
|
|
it.nextV = it.v
|
|
it.t = t
|
|
it.v = v
|
|
}
|