mirror of https://github.com/prometheus/prometheus
Optimized bstream reader used by XORChunk iterator (#7390)
* Optimized bstream reader Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fixed linter Signed-off-by: Marco Pracucci <marco@pracucci.com> * Added license to new file Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fixed type cast Signed-off-by: Marco Pracucci <marco@pracucci.com> * Changed comments Signed-off-by: Marco Pracucci <marco@pracucci.com> * Improved comments and rolledback no-op changes Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fixed race condition Signed-off-by: Marco Pracucci <marco@pracucci.com>pull/7401/head
parent
268b4c29e1
commit
f42ed03dc5
|
@ -49,10 +49,6 @@ type bstream struct {
|
|||
count uint8 // how many bits are valid in current byte
|
||||
}
|
||||
|
||||
func newBReader(b []byte) bstream {
|
||||
return bstream{stream: b, count: 8}
|
||||
}
|
||||
|
||||
func (b *bstream) bytes() []byte {
|
||||
return b.stream
|
||||
}
|
||||
|
@ -111,90 +107,141 @@ func (b *bstream) writeBits(u uint64, nbits int) {
|
|||
}
|
||||
}
|
||||
|
||||
func (b *bstream) readBit() (bit, error) {
|
||||
if len(b.stream) == 0 {
|
||||
type bstreamReader struct {
|
||||
stream []byte
|
||||
streamOffset int // The offset from which read the next byte from the stream.
|
||||
|
||||
buffer uint64 // The current buffer, filled from the stream, containing up to 8 bytes from which read bits.
|
||||
valid uint8 // The number of bits valid to read (from left) in the current buffer.
|
||||
}
|
||||
|
||||
func newBReader(b []byte) bstreamReader {
|
||||
return bstreamReader{
|
||||
stream: b,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *bstreamReader) readBit() (bit, error) {
|
||||
if b.valid == 0 {
|
||||
if !b.loadNextBuffer(1) {
|
||||
return false, io.EOF
|
||||
}
|
||||
}
|
||||
|
||||
return b.readBitFast()
|
||||
}
|
||||
|
||||
// readBitFast is like readBit but can return io.EOF if the internal buffer is empty.
|
||||
// If it returns io.EOF, the caller should retry reading bits calling readBit().
|
||||
// This function must be kept small and a leaf in order to help the compiler inlining it
|
||||
// and further improve performances.
|
||||
func (b *bstreamReader) readBitFast() (bit, error) {
|
||||
if b.valid == 0 {
|
||||
return false, io.EOF
|
||||
}
|
||||
|
||||
if b.count == 0 {
|
||||
b.stream = b.stream[1:]
|
||||
b.valid--
|
||||
bitmask := uint64(1) << b.valid
|
||||
return (b.buffer & bitmask) != 0, nil
|
||||
}
|
||||
|
||||
if len(b.stream) == 0 {
|
||||
return false, io.EOF
|
||||
func (b *bstreamReader) readBits(nbits uint8) (uint64, error) {
|
||||
if b.valid == 0 {
|
||||
if !b.loadNextBuffer(nbits) {
|
||||
return 0, io.EOF
|
||||
}
|
||||
b.count = 8
|
||||
}
|
||||
|
||||
d := (b.stream[0] << (8 - b.count)) & 0x80
|
||||
b.count--
|
||||
return d != 0, nil
|
||||
}
|
||||
if nbits <= b.valid {
|
||||
return b.readBitsFast(nbits)
|
||||
}
|
||||
|
||||
func (b *bstream) ReadByte() (byte, error) {
|
||||
return b.readByte()
|
||||
}
|
||||
// We have to read all remaining valid bits from the current buffer and a part from the next one.
|
||||
bitmask := (uint64(1) << b.valid) - 1
|
||||
nbits -= b.valid
|
||||
v := (b.buffer & bitmask) << nbits
|
||||
b.valid = 0
|
||||
|
||||
func (b *bstream) readByte() (byte, error) {
|
||||
if len(b.stream) == 0 {
|
||||
if !b.loadNextBuffer(nbits) {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
if b.count == 0 {
|
||||
b.stream = b.stream[1:]
|
||||
bitmask = (uint64(1) << nbits) - 1
|
||||
v = v | ((b.buffer >> (b.valid - nbits)) & bitmask)
|
||||
b.valid -= nbits
|
||||
|
||||
if len(b.stream) == 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
return b.stream[0], nil
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
|
||||
if b.count == 8 {
|
||||
b.count = 0
|
||||
return b.stream[0], nil
|
||||
}
|
||||
|
||||
byt := b.stream[0] << (8 - b.count)
|
||||
b.stream = b.stream[1:]
|
||||
|
||||
if len(b.stream) == 0 {
|
||||
// readBitsFast is like readBits but can return io.EOF if the internal buffer is empty.
|
||||
// If it returns io.EOF, the caller should retry reading bits calling readBits().
|
||||
// This function must be kept small and a leaf in order to help the compiler inlining it
|
||||
// and further improve performances.
|
||||
func (b *bstreamReader) readBitsFast(nbits uint8) (uint64, error) {
|
||||
if nbits > b.valid {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
// We just advanced the stream and can assume the shift to be 0.
|
||||
byt |= b.stream[0] >> b.count
|
||||
bitmask := (uint64(1) << nbits) - 1
|
||||
b.valid -= nbits
|
||||
|
||||
return byt, nil
|
||||
return (b.buffer >> b.valid) & bitmask, nil
|
||||
}
|
||||
|
||||
func (b *bstream) readBits(nbits int) (uint64, error) {
|
||||
var u uint64
|
||||
|
||||
for nbits >= 8 {
|
||||
byt, err := b.readByte()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
u = (u << 8) | uint64(byt)
|
||||
nbits -= 8
|
||||
func (b *bstreamReader) ReadByte() (byte, error) {
|
||||
v, err := b.readBits(8)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if nbits == 0 {
|
||||
return u, nil
|
||||
}
|
||||
|
||||
if nbits > int(b.count) {
|
||||
u = (u << uint(b.count)) | uint64((b.stream[0]<<(8-b.count))>>(8-b.count))
|
||||
nbits -= int(b.count)
|
||||
b.stream = b.stream[1:]
|
||||
|
||||
if len(b.stream) == 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
b.count = 8
|
||||
}
|
||||
|
||||
u = (u << uint(nbits)) | uint64((b.stream[0]<<(8-b.count))>>(8-uint(nbits)))
|
||||
b.count -= uint8(nbits)
|
||||
return u, nil
|
||||
return byte(v), nil
|
||||
}
|
||||
|
||||
// loadNextBuffer loads the next bytes from the stream into the internal buffer.
|
||||
// The input nbits is the minimum number of bits that must be read, but the implementation
|
||||
// can read more (if possible) to improve performances.
|
||||
func (b *bstreamReader) loadNextBuffer(nbits uint8) bool {
|
||||
if b.streamOffset >= len(b.stream) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Handle the case there are more then 8 bytes in the buffer (most common case)
|
||||
// in a optimized way. It's guaranteed that this branch will never read from the
|
||||
// very last byte of the stream (which suffers race conditions due to concurrent
|
||||
// writes).
|
||||
if b.streamOffset+8 < len(b.stream) {
|
||||
// This is ugly, but significantly faster.
|
||||
b.buffer =
|
||||
((uint64(b.stream[b.streamOffset])) << 56) |
|
||||
((uint64(b.stream[b.streamOffset+1])) << 48) |
|
||||
((uint64(b.stream[b.streamOffset+2])) << 40) |
|
||||
((uint64(b.stream[b.streamOffset+3])) << 32) |
|
||||
((uint64(b.stream[b.streamOffset+4])) << 24) |
|
||||
((uint64(b.stream[b.streamOffset+5])) << 16) |
|
||||
((uint64(b.stream[b.streamOffset+6])) << 8) |
|
||||
uint64(b.stream[b.streamOffset+7])
|
||||
|
||||
b.streamOffset += 8
|
||||
b.valid = 64
|
||||
return true
|
||||
}
|
||||
|
||||
// We're here if the are 8 or less bytes left in the stream. Since this reader needs
|
||||
// to handle race conditions with concurrent writes happening on the very last byte
|
||||
// we make sure to never over more than the minimum requested bits (rounded up to
|
||||
// the next byte). The following code is slower but called less frequently.
|
||||
nbytes := int((nbits / 8) + 1)
|
||||
if b.streamOffset+nbytes > len(b.stream) {
|
||||
nbytes = len(b.stream) - b.streamOffset
|
||||
}
|
||||
|
||||
buffer := uint64(0)
|
||||
for i := 0; i < nbytes; i++ {
|
||||
buffer = buffer | (uint64(b.stream[b.streamOffset+i]) << uint(8*(nbytes-i-1)))
|
||||
}
|
||||
|
||||
b.buffer = buffer
|
||||
b.streamOffset += nbytes
|
||||
b.valid = uint8(nbytes * 8)
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -0,0 +1,89 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// The code in this file was largely written by Damian Gryski as part of
|
||||
// https://github.com/dgryski/go-tsz and published under the license below.
|
||||
// It received minor modifications to suit Prometheus's needs.
|
||||
|
||||
// Copyright (c) 2015,2016 Damian Gryski <damian@gryski.com>
|
||||
// All rights reserved.
|
||||
|
||||
// Redistribution and use in source and binary forms, with or without
|
||||
// modification, are permitted provided that the following conditions are met:
|
||||
|
||||
// * Redistributions of source code must retain the above copyright notice,
|
||||
// this list of conditions and the following disclaimer.
|
||||
//
|
||||
// * Redistributions in binary form must reproduce the above copyright notice,
|
||||
// this list of conditions and the following disclaimer in the documentation
|
||||
// and/or other materials provided with the distribution.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
package chunkenc
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
func TestBstreamReader(t *testing.T) {
|
||||
// Write to the bit stream.
|
||||
w := bstream{}
|
||||
for _, bit := range []bit{true, false} {
|
||||
w.writeBit(bit)
|
||||
}
|
||||
for nbits := 1; nbits <= 64; nbits++ {
|
||||
w.writeBits(uint64(nbits), nbits)
|
||||
}
|
||||
for v := 1; v < 10000; v += 123 {
|
||||
w.writeBits(uint64(v), 29)
|
||||
}
|
||||
|
||||
// Read back.
|
||||
r := newBReader(w.bytes())
|
||||
for _, bit := range []bit{true, false} {
|
||||
v, err := r.readBitFast()
|
||||
if err != nil {
|
||||
v, err = r.readBit()
|
||||
}
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, bit, v)
|
||||
}
|
||||
for nbits := uint8(1); nbits <= 64; nbits++ {
|
||||
v, err := r.readBitsFast(nbits)
|
||||
if err != nil {
|
||||
v, err = r.readBits(nbits)
|
||||
}
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, uint64(nbits), v, "nbits=%d", nbits)
|
||||
}
|
||||
for v := 1; v < 10000; v += 123 {
|
||||
actual, err := r.readBitsFast(29)
|
||||
if err != nil {
|
||||
actual, err = r.readBits(29)
|
||||
}
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, uint64(v), actual, "v=%d", v)
|
||||
}
|
||||
}
|
|
@ -240,7 +240,7 @@ func (a *xorAppender) writeVDelta(v float64) {
|
|||
}
|
||||
|
||||
type xorIterator struct {
|
||||
br bstream
|
||||
br bstreamReader
|
||||
numTotal uint16
|
||||
numRead uint16
|
||||
|
||||
|
@ -328,7 +328,10 @@ func (it *xorIterator) Next() bool {
|
|||
// read delta-of-delta
|
||||
for i := 0; i < 4; i++ {
|
||||
d <<= 1
|
||||
bit, err := it.br.readBit()
|
||||
bit, err := it.br.readBitFast()
|
||||
if err != nil {
|
||||
bit, err = it.br.readBit()
|
||||
}
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
|
@ -350,6 +353,7 @@ func (it *xorIterator) Next() bool {
|
|||
case 0x0e:
|
||||
sz = 20
|
||||
case 0x0f:
|
||||
// Do not use fast because it's very unlikely it will succeed.
|
||||
bits, err := it.br.readBits(64)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
|
@ -360,7 +364,10 @@ func (it *xorIterator) Next() bool {
|
|||
}
|
||||
|
||||
if sz != 0 {
|
||||
bits, err := it.br.readBits(int(sz))
|
||||
bits, err := it.br.readBitsFast(sz)
|
||||
if err != nil {
|
||||
bits, err = it.br.readBits(sz)
|
||||
}
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
|
@ -379,7 +386,10 @@ func (it *xorIterator) Next() bool {
|
|||
}
|
||||
|
||||
func (it *xorIterator) readValue() bool {
|
||||
bit, err := it.br.readBit()
|
||||
bit, err := it.br.readBitFast()
|
||||
if err != nil {
|
||||
bit, err = it.br.readBit()
|
||||
}
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
|
@ -388,7 +398,10 @@ func (it *xorIterator) readValue() bool {
|
|||
if bit == zero {
|
||||
// it.val = it.val
|
||||
} else {
|
||||
bit, err := it.br.readBit()
|
||||
bit, err := it.br.readBitFast()
|
||||
if err != nil {
|
||||
bit, err = it.br.readBit()
|
||||
}
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
|
@ -397,14 +410,20 @@ func (it *xorIterator) readValue() bool {
|
|||
// reuse leading/trailing zero bits
|
||||
// it.leading, it.trailing = it.leading, it.trailing
|
||||
} else {
|
||||
bits, err := it.br.readBits(5)
|
||||
bits, err := it.br.readBitsFast(5)
|
||||
if err != nil {
|
||||
bits, err = it.br.readBits(5)
|
||||
}
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
it.leading = uint8(bits)
|
||||
|
||||
bits, err = it.br.readBits(6)
|
||||
bits, err = it.br.readBitsFast(6)
|
||||
if err != nil {
|
||||
bits, err = it.br.readBits(6)
|
||||
}
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
|
@ -417,14 +436,17 @@ func (it *xorIterator) readValue() bool {
|
|||
it.trailing = 64 - it.leading - mbits
|
||||
}
|
||||
|
||||
mbits := int(64 - it.leading - it.trailing)
|
||||
bits, err := it.br.readBits(mbits)
|
||||
mbits := 64 - it.leading - it.trailing
|
||||
bits, err := it.br.readBitsFast(mbits)
|
||||
if err != nil {
|
||||
bits, err = it.br.readBits(mbits)
|
||||
}
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
vbits := math.Float64bits(it.val)
|
||||
vbits ^= (bits << it.trailing)
|
||||
vbits ^= bits << it.trailing
|
||||
it.val = math.Float64frombits(vbits)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue