prometheus/tsdb/chunkenc/xor.go

511 lines
12 KiB
Go

// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// The code in this file was largely written by Damian Gryski as part of
// https://github.com/dgryski/go-tsz and published under the license below.
// It was modified to accommodate reading from byte slices without modifying
// the underlying bytes, which would panic when reading from mmapped
// read-only byte slices.
// Copyright (c) 2015,2016 Damian Gryski <damian@gryski.com>
// All rights reserved.
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package chunkenc
import (
"encoding/binary"
"math"
"math/bits"
"github.com/prometheus/prometheus/model/histogram"
)
const (
chunkCompactCapacityThreshold = 32
)
// XORChunk holds XOR encoded sample data.
type XORChunk struct {
b bstream
}
// NewXORChunk returns a new chunk with XOR encoding.
func NewXORChunk() *XORChunk {
b := make([]byte, 2, 128)
return &XORChunk{b: bstream{stream: b, count: 0}}
}
func (c *XORChunk) Reset(stream []byte) {
c.b.Reset(stream)
}
// Encoding returns the encoding type.
func (c *XORChunk) Encoding() Encoding {
return EncXOR
}
// Bytes returns the underlying byte slice of the chunk.
func (c *XORChunk) Bytes() []byte {
return c.b.bytes()
}
// NumSamples returns the number of samples in the chunk.
func (c *XORChunk) NumSamples() int {
return int(binary.BigEndian.Uint16(c.Bytes()))
}
// Compact implements the Chunk interface.
func (c *XORChunk) Compact() {
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {
buf := make([]byte, l)
copy(buf, c.b.stream)
c.b.stream = buf
}
}
// Appender implements the Chunk interface.
// It is not valid to call Appender() multiple times concurrently or to use multiple
// Appenders on the same chunk.
func (c *XORChunk) Appender() (Appender, error) {
it := c.iterator(nil)
// To get an appender we must know the state it would have if we had
// appended all existing data from scratch.
// We iterate through the end and populate via the iterator's state.
for it.Next() != ValNone {
}
if err := it.Err(); err != nil {
return nil, err
}
a := &xorAppender{
b: &c.b,
t: it.t,
v: it.val,
tDelta: it.tDelta,
leading: it.leading,
trailing: it.trailing,
}
if it.numTotal == 0 {
a.leading = 0xff
}
return a, nil
}
func (c *XORChunk) iterator(it Iterator) *xorIterator {
if xorIter, ok := it.(*xorIterator); ok {
xorIter.Reset(c.b.bytes())
return xorIter
}
return &xorIterator{
// The first 2 bytes contain chunk headers.
// We skip that for actual samples.
br: newBReader(c.b.bytes()[2:]),
numTotal: binary.BigEndian.Uint16(c.b.bytes()),
t: math.MinInt64,
}
}
// Iterator implements the Chunk interface.
// Iterator() must not be called concurrently with any modifications to the chunk,
// but after it returns you can use an Iterator concurrently with an Appender or
// other Iterators.
func (c *XORChunk) Iterator(it Iterator) Iterator {
return c.iterator(it)
}
type xorAppender struct {
b *bstream
t int64
v float64
tDelta uint64
leading uint8
trailing uint8
}
func (a *xorAppender) Append(t int64, v float64) {
var tDelta uint64
num := binary.BigEndian.Uint16(a.b.bytes())
switch num {
case 0:
buf := make([]byte, binary.MaxVarintLen64)
for _, b := range buf[:binary.PutVarint(buf, t)] {
a.b.writeByte(b)
}
a.b.writeBits(math.Float64bits(v), 64)
case 1:
tDelta = uint64(t - a.t)
buf := make([]byte, binary.MaxVarintLen64)
for _, b := range buf[:binary.PutUvarint(buf, tDelta)] {
a.b.writeByte(b)
}
a.writeVDelta(v)
default:
tDelta = uint64(t - a.t)
dod := int64(tDelta - a.tDelta)
// Gorilla has a max resolution of seconds, Prometheus milliseconds.
// Thus we use higher value range steps with larger bit size.
//
// TODO(beorn7): This seems to needlessly jump to large bit
// sizes even for very small deviations from zero. Timestamp
// compression can probably benefit from some smaller bit
// buckets. See also what was done for histogram encoding in
// varbit.go.
switch {
case dod == 0:
a.b.writeBit(zero)
case bitRange(dod, 14):
a.b.writeBits(0b10, 2)
a.b.writeBits(uint64(dod), 14)
case bitRange(dod, 17):
a.b.writeBits(0b110, 3)
a.b.writeBits(uint64(dod), 17)
case bitRange(dod, 20):
a.b.writeBits(0b1110, 4)
a.b.writeBits(uint64(dod), 20)
default:
a.b.writeBits(0b1111, 4)
a.b.writeBits(uint64(dod), 64)
}
a.writeVDelta(v)
}
a.t = t
a.v = v
binary.BigEndian.PutUint16(a.b.bytes(), num+1)
a.tDelta = tDelta
}
// bitRange returns whether the given integer can be represented by nbits.
// See docs/bstream.md.
func bitRange(x int64, nbits uint8) bool {
return -((1<<(nbits-1))-1) <= x && x <= 1<<(nbits-1)
}
func (a *xorAppender) writeVDelta(v float64) {
xorWrite(a.b, v, a.v, &a.leading, &a.trailing)
}
func (a *xorAppender) AppendHistogram(*HistogramAppender, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) {
panic("appended a histogram sample to a float chunk")
}
func (a *xorAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) {
panic("appended a float histogram sample to a float chunk")
}
type xorIterator struct {
br bstreamReader
numTotal uint16
numRead uint16
t int64
val float64
leading uint8
trailing uint8
tDelta uint64
err error
}
func (it *xorIterator) Seek(t int64) ValueType {
if it.err != nil {
return ValNone
}
for t > it.t || it.numRead == 0 {
if it.Next() == ValNone {
return ValNone
}
}
return ValFloat
}
func (it *xorIterator) At() (int64, float64) {
return it.t, it.val
}
func (it *xorIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
panic("cannot call xorIterator.AtHistogram")
}
func (it *xorIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
panic("cannot call xorIterator.AtFloatHistogram")
}
func (it *xorIterator) AtT() int64 {
return it.t
}
func (it *xorIterator) Err() error {
return it.err
}
func (it *xorIterator) Reset(b []byte) {
// The first 2 bytes contain chunk headers.
// We skip that for actual samples.
it.br = newBReader(b[2:])
it.numTotal = binary.BigEndian.Uint16(b)
it.numRead = 0
it.t = 0
it.val = 0
it.leading = 0
it.trailing = 0
it.tDelta = 0
it.err = nil
}
func (it *xorIterator) Next() ValueType {
if it.err != nil || it.numRead == it.numTotal {
return ValNone
}
if it.numRead == 0 {
t, err := binary.ReadVarint(&it.br)
if err != nil {
it.err = err
return ValNone
}
v, err := it.br.readBits(64)
if err != nil {
it.err = err
return ValNone
}
it.t = t
it.val = math.Float64frombits(v)
it.numRead++
return ValFloat
}
if it.numRead == 1 {
tDelta, err := binary.ReadUvarint(&it.br)
if err != nil {
it.err = err
return ValNone
}
it.tDelta = tDelta
it.t += int64(it.tDelta)
return it.readValue()
}
var d byte
// read delta-of-delta
for i := 0; i < 4; i++ {
d <<= 1
bit, err := it.br.readBitFast()
if err != nil {
bit, err = it.br.readBit()
}
if err != nil {
it.err = err
return ValNone
}
if bit == zero {
break
}
d |= 1
}
var sz uint8
var dod int64
switch d {
case 0b0:
// dod == 0
case 0b10:
sz = 14
case 0b110:
sz = 17
case 0b1110:
sz = 20
case 0b1111:
// Do not use fast because it's very unlikely it will succeed.
bits, err := it.br.readBits(64)
if err != nil {
it.err = err
return ValNone
}
dod = int64(bits)
}
if sz != 0 {
bits, err := it.br.readBitsFast(sz)
if err != nil {
bits, err = it.br.readBits(sz)
}
if err != nil {
it.err = err
return ValNone
}
// Account for negative numbers, which come back as high unsigned numbers.
// See docs/bstream.md.
if bits > (1 << (sz - 1)) {
bits -= 1 << sz
}
dod = int64(bits)
}
it.tDelta = uint64(int64(it.tDelta) + dod)
it.t += int64(it.tDelta)
return it.readValue()
}
func (it *xorIterator) readValue() ValueType {
err := xorRead(&it.br, &it.val, &it.leading, &it.trailing)
if err != nil {
it.err = err
return ValNone
}
it.numRead++
return ValFloat
}
func xorWrite(b *bstream, newValue, currentValue float64, leading, trailing *uint8) {
delta := math.Float64bits(newValue) ^ math.Float64bits(currentValue)
if delta == 0 {
b.writeBit(zero)
return
}
b.writeBit(one)
newLeading := uint8(bits.LeadingZeros64(delta))
newTrailing := uint8(bits.TrailingZeros64(delta))
// Clamp number of leading zeros to avoid overflow when encoding.
if newLeading >= 32 {
newLeading = 31
}
if *leading != 0xff && newLeading >= *leading && newTrailing >= *trailing {
// In this case, we stick with the current leading/trailing.
b.writeBit(zero)
b.writeBits(delta>>*trailing, 64-int(*leading)-int(*trailing))
return
}
// Update leading/trailing for the caller.
*leading, *trailing = newLeading, newTrailing
b.writeBit(one)
b.writeBits(uint64(newLeading), 5)
// Note that if newLeading == newTrailing == 0, then sigbits == 64. But
// that value doesn't actually fit into the 6 bits we have. Luckily, we
// never need to encode 0 significant bits, since that would put us in
// the other case (vdelta == 0). So instead we write out a 0 and adjust
// it back to 64 on unpacking.
sigbits := 64 - newLeading - newTrailing
b.writeBits(uint64(sigbits), 6)
b.writeBits(delta>>newTrailing, int(sigbits))
}
func xorRead(br *bstreamReader, value *float64, leading, trailing *uint8) error {
bit, err := br.readBitFast()
if err != nil {
bit, err = br.readBit()
}
if err != nil {
return err
}
if bit == zero {
return nil
}
bit, err = br.readBitFast()
if err != nil {
bit, err = br.readBit()
}
if err != nil {
return err
}
var (
bits uint64
newLeading, newTrailing, mbits uint8
)
if bit == zero {
// Reuse leading/trailing zero bits.
newLeading, newTrailing = *leading, *trailing
mbits = 64 - newLeading - newTrailing
} else {
bits, err = br.readBitsFast(5)
if err != nil {
bits, err = br.readBits(5)
}
if err != nil {
return err
}
newLeading = uint8(bits)
bits, err = br.readBitsFast(6)
if err != nil {
bits, err = br.readBits(6)
}
if err != nil {
return err
}
mbits = uint8(bits)
// 0 significant bits here means we overflowed and we actually
// need 64; see comment in xrWrite.
if mbits == 0 {
mbits = 64
}
newTrailing = 64 - newLeading - mbits
// Update leading/trailing zero bits for the caller.
*leading, *trailing = newLeading, newTrailing
}
bits, err = br.readBitsFast(mbits)
if err != nil {
bits, err = br.readBits(mbits)
}
if err != nil {
return err
}
vbits := math.Float64bits(*value)
vbits ^= bits << newTrailing
*value = math.Float64frombits(vbits)
return nil
}