From e67cf768dcb320178d82ef82ce474cdcf1916177 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sun, 20 Nov 2016 16:14:21 +0100 Subject: [PATCH] chunks: remove intermeidate copy from xor chunk --- chunks/bstream.go | 209 +++++++++++++++++++++++++++++++++++++++++++++ chunks/chunk.go | 192 ----------------------------------------- chunks/xor.go | 147 ++++++++++++------------------- chunks/xor_test.go | 2 +- 4 files changed, 266 insertions(+), 284 deletions(-) create mode 100644 chunks/bstream.go diff --git a/chunks/bstream.go b/chunks/bstream.go new file mode 100644 index 000000000..c595e0192 --- /dev/null +++ b/chunks/bstream.go @@ -0,0 +1,209 @@ +package chunks + +import ( + "bytes" + "encoding/binary" + "io" +) + +// bstream is a stream of bits +type bstream struct { + // the data stream + stream []byte + + // how many bits are valid in current byte + count uint8 +} + +func newBReader(b []byte) *bstream { + return &bstream{stream: b, count: 8} +} + +func newBWriter(size int) *bstream { + return &bstream{stream: make([]byte, 0, size), count: 0} +} + +func (b *bstream) clone() *bstream { + d := make([]byte, len(b.stream)) + copy(d, b.stream) + return &bstream{stream: d, count: b.count} +} + +func (b *bstream) bytes() []byte { + return b.stream +} + +type bit bool + +const ( + zero bit = false + one bit = true +) + +func (b *bstream) writeBit(bit bit) { + + if b.count == 0 { + b.stream = append(b.stream, 0) + b.count = 8 + } + + i := len(b.stream) - 1 + + if bit { + b.stream[i] |= 1 << (b.count - 1) + } + + b.count-- +} + +func (b *bstream) writeByte(byt byte) { + + if b.count == 0 { + b.stream = append(b.stream, 0) + b.count = 8 + } + + i := len(b.stream) - 1 + + // fill up b.b with b.count bits from byt + b.stream[i] |= byt >> (8 - b.count) + + b.stream = append(b.stream, 0) + i++ + b.stream[i] = byt << b.count +} + +func (b *bstream) writeBits(u uint64, nbits int) { + u <<= (64 - uint(nbits)) + for nbits >= 8 { + byt := byte(u >> 56) + b.writeByte(byt) + u <<= 8 + nbits -= 8 + } + + for nbits > 0 { + b.writeBit((u >> 63) == 1) + u <<= 1 + nbits-- + } +} + +func (b *bstream) readBit() (bit, error) { + + if len(b.stream) == 0 { + return false, io.EOF + } + + if b.count == 0 { + b.stream = b.stream[1:] + // did we just run out of stuff to read? + if len(b.stream) == 0 { + return false, io.EOF + } + b.count = 8 + } + + b.count-- + d := b.stream[0] & 0x80 + b.stream[0] <<= 1 + return d != 0, nil +} + +func (b *bstream) readByte() (byte, error) { + + if len(b.stream) == 0 { + return 0, io.EOF + } + + if b.count == 0 { + b.stream = b.stream[1:] + + if len(b.stream) == 0 { + return 0, io.EOF + } + + b.count = 8 + } + + if b.count == 8 { + b.count = 0 + return b.stream[0], nil + } + + byt := b.stream[0] + b.stream = b.stream[1:] + + if len(b.stream) == 0 { + return 0, io.EOF + } + + byt |= b.stream[0] >> b.count + b.stream[0] <<= (8 - b.count) + + return byt, nil +} + +func (b *bstream) readBits(nbits int) (uint64, error) { + + var u uint64 + + for nbits >= 8 { + byt, err := b.readByte() + if err != nil { + return 0, err + } + + u = (u << 8) | uint64(byt) + nbits -= 8 + } + + if nbits == 0 { + return u, nil + } + + if nbits > int(b.count) { + u = (u << uint(b.count)) | uint64(b.stream[0]>>(8-b.count)) + nbits -= int(b.count) + b.stream = b.stream[1:] + + if len(b.stream) == 0 { + return 0, io.EOF + } + b.count = 8 + } + + u = (u << uint(nbits)) | uint64(b.stream[0]>>(8-uint(nbits))) + b.stream[0] <<= uint(nbits) + b.count -= uint8(nbits) + return u, nil +} + +// MarshalBinary implements the encoding.BinaryMarshaler interface +func (b *bstream) MarshalBinary() ([]byte, error) { + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.BigEndian, b.count) + if err != nil { + return nil, err + } + err = binary.Write(buf, binary.BigEndian, b.stream) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +// UnmarshalBinary implements the encoding.BinaryUnmarshaler interface +func (b *bstream) UnmarshalBinary(bIn []byte) error { + buf := bytes.NewReader(bIn) + err := binary.Read(buf, binary.BigEndian, &b.count) + if err != nil { + return err + } + b.stream = make([]byte, buf.Len()) + err = binary.Read(buf, binary.BigEndian, &b.stream) + if err != nil { + return err + } + return nil +} diff --git a/chunks/chunk.go b/chunks/chunk.go index bcc66c1ff..2a453d81b 100644 --- a/chunks/chunk.go +++ b/chunks/chunk.go @@ -121,198 +121,6 @@ func (c *rawChunk) append(b []byte) error { return nil } -type bitChunk struct { - d []byte - - sz int - pos uint32 // bytes used in the chunk - count uint32 // valid bits in last byte - - // Read copies of above values used when retrieving iterators. - rl uint32 - rcount uint32 -} - -type bit bool - -const ( - zero bit = false - one bit = true -) - -func newBitChunk(sz int, enc Encoding) bitChunk { - c := bitChunk{d: make([]byte, sz+1), pos: 1, count: 8} - c.d[0] = byte(enc) - return c -} - -func (c *bitChunk) encoding() Encoding { - return Encoding(c.d[0]) -} - -func (c *bitChunk) Data() []byte { - return c.d[:c.pos] -} - -func (c *bitChunk) reader() *bitChunkReader { - fmt.Println(len(c.d), c.pos) - return &bitChunkReader{d: c.d[1 : c.pos+1], count: 8} -} - -type bitChunkReader struct { - d []byte - count uint8 - l uint32 -} - -func (r *bitChunkReader) readBit() (bit, error) { - if len(r.d) == 0 { - return false, io.EOF - } - - if r.count == 0 { - r.d = r.d[1:] - // did we just run out of stuff to read? - if len(r.d) == 0 { - return false, io.EOF - } - r.count = 8 - } - - r.count-- - d := r.d[0] & 0x80 - r.d[0] <<= 1 - return d != 0, nil -} - -func (r *bitChunkReader) readByte() (byte, error) { - if len(r.d) == 0 { - return 0, io.EOF - } - - if r.count == 0 { - r.d = r.d[1:] - - if len(r.d) == 0 { - return 0, io.EOF - } - - r.count = 8 - } - - if r.count == 8 { - r.count = 0 - return r.d[0], nil - } - - byt := r.d[0] - r.d = r.d[1:] - - if len(r.d) == 0 { - return 0, io.EOF - } - - byt |= r.d[0] >> r.count - r.d[0] <<= (8 - r.count) - - return byt, nil -} - -func (r *bitChunkReader) readBits(nbits int) (uint64, error) { - var u uint64 - - for nbits >= 8 { - byt, err := r.readByte() - if err != nil { - return 0, err - } - - u = (u << 8) | uint64(byt) - nbits -= 8 - } - - if nbits == 0 { - return u, nil - } - - if nbits > int(r.count) { - u = (u << uint(r.count)) | uint64(r.d[0]>>(8-r.count)) - nbits -= int(r.count) - r.d = r.d[1:] - - if len(r.d) == 0 { - return 0, io.EOF - } - r.count = 8 - } - - u = (u << uint(nbits)) | uint64(r.d[0]>>(8-uint(nbits))) - r.d[0] <<= uint(nbits) - r.count -= uint8(nbits) - return u, nil -} - -// append appends the first nbits bits from b into the chunk. -// b must contain at least nbits bits. -// We are using fixed 16 bytes as it might perform better due to -// more static assumptions. -func (c *bitChunk) append(b [20]byte, nbits int) error { - if nbits > 8*(len(c.d)-int(c.pos)-1)-int(c.count) { - return ErrChunkFull - } - - c.writeBits(b, nbits) - // Swap the working length and count integers into the ones used - // to retrieve iterators. This allows to concurrently retrieve - // iteartors while appending to a chunk. - // This does not make it safe for concurrent appends! - atomic.StoreUint32(&c.rl, c.pos) - atomic.StoreUint32(&c.rcount, c.count) - return nil -} - -func (c *bitChunk) writeBit(bit bit) { - if c.count == 0 { - c.pos++ - c.count = 8 - } - - if bit { - c.d[c.pos] |= 1 << (c.count - 1) - } - - c.count-- -} - -func (c *bitChunk) writeByte(byt byte) { - if c.count == 0 { - c.pos++ - c.count = 8 - } - - // fill up b.b with b.count bits from byt - c.d[c.pos] |= byt >> (8 - c.count) - - c.pos++ - c.d[c.pos] = byt << c.count -} - -func (c *bitChunk) writeBits(b [20]byte, nbits int) { - i := 0 - for nbits >= 8 { - c.writeByte(b[i]) - i++ - nbits -= 8 - } - - bi := b[i] - for nbits > 0 { - c.writeBit((bi >> 7) == 1) - bi <<= 1 - nbits-- - } -} - // PlainChunk implements a Chunk using simple 16 byte representations // of sample pairs. type PlainChunk struct { diff --git a/chunks/xor.go b/chunks/xor.go index d190aa9c4..5766aebc9 100644 --- a/chunks/xor.go +++ b/chunks/xor.go @@ -9,39 +9,46 @@ import ( // XORChunk holds XOR encoded sample data. type XORChunk struct { + bstream + num uint16 - bitChunk + sz int + + lastLen int + lastCount uint8 } // NewXORChunk returns a new chunk with XOR encoding of the given size. func NewXORChunk(sz int) *XORChunk { - return &XORChunk{bitChunk: newBitChunk(sz, EncXOR)} + return &XORChunk{sz: sz} +} + +func (c *XORChunk) Data() []byte { + return nil } // Appender implements the Chunk interface. func (c *XORChunk) Appender() Appender { - return &xorAppender{c: c, pos: 1} + return &xorAppender{c: c} } // Iterator implements the Chunk interface. func (c *XORChunk) Iterator() Iterator { - return &xorIterator{br: c.bitChunk.reader(), numTotal: c.num} + br := c.bstream.clone() + br.count = 8 + return &xorIterator{br: br, numTotal: c.num} } type xorAppender struct { c *XORChunk - t int64 - v float64 - buf [20]byte // bits written for current sample. 17 to avoid if condition in hot path. - pos uint8 // num of bytes in buf - count uint8 // number of bits in last buf byte + t int64 + v float64 + tDelta uint64 leading uint8 trailing uint8 finished bool - - tDelta uint64 } func (a *xorAppender) Append(ts model.Time, v model.SampleValue) error { @@ -50,55 +57,55 @@ func (a *xorAppender) Append(ts model.Time, v model.SampleValue) error { } func (a *xorAppender) append(t int64, v float64) error { - // Reset bit buffer. - a.buf = [20]byte{} - a.count = 8 - a.pos = 0 + var tDelta uint64 + + if a.c.num == 0 { + // TODO: store varint time? + a.c.writeBits(uint64(t), 64) + a.c.writeBits(math.Float64bits(v), 64) - if a.c.num > 1 { - tDelta := uint64(t - a.t) + } else if a.c.num == 1 { + tDelta = uint64(t - a.t) + // TODO: use varint or other encoding for first delta? + a.c.writeBits(tDelta, 64) + a.writeVDelta(v) + + } else { + tDelta = uint64(t - a.t) dod := int64(tDelta - a.tDelta) // Gorilla has a max resolution of seconds, Prometheus milliseconds. // Thus we use higher value range steps with larger bit size. switch { case dod == 0: - a.writeBit(zero) + a.c.writeBit(zero) case -8191 <= dod && dod <= 8192: - a.writeBits(0x02, 2) // '10' - a.writeBits(uint64(dod), 14) + a.c.writeBits(0x02, 2) // '10' + a.c.writeBits(uint64(dod), 14) case -65535 <= dod && dod <= 65536: - a.writeBits(0x06, 3) // '110' - a.writeBits(uint64(dod), 17) + a.c.writeBits(0x06, 3) // '110' + a.c.writeBits(uint64(dod), 17) case -524287 <= dod && dod <= 524288: - a.writeBits(0x0e, 4) // '1110' - a.writeBits(uint64(dod), 20) + a.c.writeBits(0x0e, 4) // '1110' + a.c.writeBits(uint64(dod), 20) default: - a.writeBits(0x0f, 4) // '1111' - a.writeBits(uint64(dod), 64) + a.c.writeBits(0x0f, 4) // '1111' + a.c.writeBits(uint64(dod), 64) } - a.tDelta = tDelta a.writeVDelta(v) - - } else if a.c.num == 0 { - // TODO: store varint time? - a.writeBits(uint64(t), 64) - a.writeBits(math.Float64bits(v), 64) - } else { - a.tDelta = uint64(t - a.t) - // TODO: use varint or other encoding for first delta? - a.writeBits(uint64(a.tDelta), 64) - a.writeVDelta(v) } - if err := a.c.append(a.buf, int(a.pos+1)*8-int(a.count)); err != nil { - return err + if len(a.c.stream) > a.c.sz { + return ErrChunkFull } + a.t = t a.v = v a.c.num++ - // TODO: also preserve tDelta – even though it doesn't really matter at this point. + a.tDelta = tDelta + a.c.lastCount = a.c.count + a.c.lastLen = len(a.c.stream) return nil } @@ -106,10 +113,10 @@ func (a *xorAppender) writeVDelta(v float64) { vDelta := math.Float64bits(v) ^ math.Float64bits(a.v) if vDelta == 0 { - a.writeBit(zero) + a.c.writeBit(zero) return } - a.writeBit(one) + a.c.writeBit(one) leading := uint8(bits.Clz(vDelta)) trailing := uint8(bits.Ctz(vDelta)) @@ -121,67 +128,25 @@ func (a *xorAppender) writeVDelta(v float64) { // TODO(dgryski): check if it's 'cheaper' to reset the leading/trailing bits instead if a.leading != ^uint8(0) && leading >= a.leading && trailing >= a.trailing { - a.writeBit(zero) - a.writeBits(vDelta>>a.trailing, 64-int(a.leading)-int(a.trailing)) + a.c.writeBit(zero) + a.c.writeBits(vDelta>>a.trailing, 64-int(a.leading)-int(a.trailing)) } else { a.leading, a.trailing = leading, trailing - a.writeBit(one) - a.writeBits(uint64(leading), 5) + a.c.writeBit(one) + a.c.writeBits(uint64(leading), 5) // Note that if leading == trailing == 0, then sigbits == 64. But that value doesn't actually fit into the 6 bits we have. // Luckily, we never need to encode 0 significant bits, since that would put us in the other case (vdelta == 0). // So instead we write out a 0 and adjust it back to 64 on unpacking. sigbits := 64 - leading - trailing - a.writeBits(uint64(sigbits), 6) - a.writeBits(vDelta>>trailing, int(sigbits)) - } -} - -func (a *xorAppender) writeBits(u uint64, nbits int) { - u <<= (64 - uint(nbits)) - for nbits >= 8 { - byt := byte(u >> 56) - a.writeByte(byt) - u <<= 8 - nbits -= 8 - } - - for nbits > 0 { - a.writeBit((u >> 63) == 1) - u <<= 1 - nbits-- - } -} - -func (a *xorAppender) writeBit(bit bit) { - if a.count == 0 { - a.pos++ - a.count = 8 - } - - if bit { - a.buf[a.pos] |= 1 << (a.count - 1) + a.c.writeBits(uint64(sigbits), 6) + a.c.writeBits(vDelta>>trailing, int(sigbits)) } - - a.count-- -} - -func (a *xorAppender) writeByte(byt byte) { - if a.count == 0 { - a.pos++ - a.count = 8 - } - - // fill up b.b with b.count bits from byt - a.buf[a.pos] |= byt >> (8 - a.count) - - a.pos++ - a.buf[a.pos] = byt << a.count } type xorIterator struct { - br *bitChunkReader + br *bstream numTotal uint16 numRead uint16 diff --git a/chunks/xor_test.go b/chunks/xor_test.go index cb5c23282..fc95a34a1 100644 --- a/chunks/xor_test.go +++ b/chunks/xor_test.go @@ -55,7 +55,7 @@ func testXORChunk(t *testing.T) { } func TestXORChunk(t *testing.T) { - for i := 0; i < 1000000; i++ { + for i := 0; i < 10; i++ { testXORChunk(t) } }