tsdb/chunkenc.Pool: Refactor Get and Put

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
pull/14043/head
Arve Knudsen 2024-01-17 18:28:06 +01:00
parent a25160e6a4
commit 108a6bc9f6
7 changed files with 150 additions and 44 deletions

View File

@ -52,6 +52,12 @@ type bstream struct {
count uint8 // How many right-most bits are available for writing in the current byte (the last byte of the stream).
}
// Reset resets b around stream.
func (b *bstream) Reset(stream []byte) {
b.stream = stream
b.count = 0
}
func (b *bstream) bytes() []byte {
return b.stream
}

View File

@ -19,6 +19,19 @@ import (
"github.com/stretchr/testify/require"
)
func TestBstream_Reset(t *testing.T) {
bs := bstream{
stream: []byte("test"),
count: 10,
}
bs.Reset([]byte("was reset"))
require.Equal(t, bstream{
stream: []byte("was reset"),
count: 0,
}, bs)
}
func TestBstreamReader(t *testing.T) {
// Write to the bit stream.
w := bstream{}

View File

@ -87,6 +87,9 @@ type Chunk interface {
// There's no strong guarantee that no samples will be appended once
// Compact() is called. Implementing this function is optional.
Compact()
// Reset resets the chunk given stream.
Reset(stream []byte)
}
type Iterable interface {
@ -303,64 +306,47 @@ func NewPool() Pool {
}
func (p *pool) Get(e Encoding, b []byte) (Chunk, error) {
var c Chunk
switch e {
case EncXOR:
c := p.xor.Get().(*XORChunk)
c.b.stream = b
c.b.count = 0
return c, nil
c = p.xor.Get().(*XORChunk)
case EncHistogram:
c := p.histogram.Get().(*HistogramChunk)
c.b.stream = b
c.b.count = 0
return c, nil
c = p.histogram.Get().(*HistogramChunk)
case EncFloatHistogram:
c := p.floatHistogram.Get().(*FloatHistogramChunk)
c.b.stream = b
c.b.count = 0
return c, nil
c = p.floatHistogram.Get().(*FloatHistogramChunk)
default:
return nil, fmt.Errorf("invalid chunk encoding %q", e)
}
return nil, fmt.Errorf("invalid chunk encoding %q", e)
c.Reset(b)
return c, nil
}
func (p *pool) Put(c Chunk) error {
var sp *sync.Pool
var ok bool
switch c.Encoding() {
case EncXOR:
xc, ok := c.(*XORChunk)
// This may happen often with wrapped chunks. Nothing we can really do about
// it but returning an error would cause a lot of allocations again. Thus,
// we just skip it.
if !ok {
return nil
}
xc.b.stream = nil
xc.b.count = 0
p.xor.Put(c)
_, ok = c.(*XORChunk)
sp = &p.xor
case EncHistogram:
sh, ok := c.(*HistogramChunk)
// This may happen often with wrapped chunks. Nothing we can really do about
// it but returning an error would cause a lot of allocations again. Thus,
// we just skip it.
if !ok {
return nil
}
sh.b.stream = nil
sh.b.count = 0
p.histogram.Put(c)
_, ok = c.(*HistogramChunk)
sp = &p.histogram
case EncFloatHistogram:
sh, ok := c.(*FloatHistogramChunk)
// This may happen often with wrapped chunks. Nothing we can really do about
// it but returning an error would cause a lot of allocations again. Thus,
// we just skip it.
if !ok {
return nil
}
sh.b.stream = nil
sh.b.count = 0
p.floatHistogram.Put(c)
_, ok = c.(*FloatHistogramChunk)
sp = &p.floatHistogram
default:
return fmt.Errorf("invalid chunk encoding %q", c.Encoding())
}
if !ok {
// This may happen often with wrapped chunks. Nothing we can really do about
// it but returning an error would cause a lot of allocations again. Thus,
// we just skip it.
return nil
}
c.Reset(nil)
sp.Put(c)
return nil
}

View File

@ -110,6 +110,96 @@ func testChunk(t *testing.T, c Chunk) {
require.Equal(t, ValNone, it3.Seek(exp[len(exp)-1].t+1))
}
func TestPool(t *testing.T) {
p := NewPool()
for _, tc := range []struct {
name string
encoding Encoding
expErr error
}{
{
name: "xor",
encoding: EncXOR,
},
{
name: "histogram",
encoding: EncHistogram,
},
{
name: "float histogram",
encoding: EncFloatHistogram,
},
{
name: "invalid encoding",
encoding: EncNone,
expErr: fmt.Errorf(`invalid chunk encoding "none"`),
},
} {
t.Run(tc.name, func(t *testing.T) {
c, err := p.Get(tc.encoding, []byte("test"))
if tc.expErr != nil {
require.EqualError(t, err, tc.expErr.Error())
return
}
require.NoError(t, err)
var b *bstream
switch tc.encoding {
case EncHistogram:
b = &c.(*HistogramChunk).b
case EncFloatHistogram:
b = &c.(*FloatHistogramChunk).b
default:
b = &c.(*XORChunk).b
}
require.Equal(t, &bstream{
stream: []byte("test"),
count: 0,
}, b)
b.count = 1
require.NoError(t, p.Put(c))
require.Equal(t, &bstream{
stream: nil,
count: 0,
}, b)
})
}
t.Run("put bad chunk wrapper", func(t *testing.T) {
// When a wrapping chunk poses as an encoding it can't be converted to, Put should skip it.
c := fakeChunk{
encoding: EncXOR,
t: t,
}
require.NoError(t, p.Put(c))
})
t.Run("put invalid encoding", func(t *testing.T) {
c := fakeChunk{
encoding: EncNone,
t: t,
}
require.EqualError(t, p.Put(c), `invalid chunk encoding "none"`)
})
}
type fakeChunk struct {
Chunk
encoding Encoding
t *testing.T
}
func (c fakeChunk) Encoding() Encoding {
return c.encoding
}
func (c fakeChunk) Reset([]byte) {
c.t.Fatal("Reset should not be called")
}
func benchmarkIterator(b *testing.B, newChunk func() Chunk) {
const samplesPerChunk = 250
var (

View File

@ -44,6 +44,10 @@ func NewFloatHistogramChunk() *FloatHistogramChunk {
return &FloatHistogramChunk{b: bstream{stream: b, count: 0}}
}
func (c *FloatHistogramChunk) Reset(stream []byte) {
c.b.Reset(stream)
}
// xorValue holds all the necessary information to encode
// and decode XOR encoded float64 values.
type xorValue struct {

View File

@ -45,6 +45,10 @@ func NewHistogramChunk() *HistogramChunk {
return &HistogramChunk{b: bstream{stream: b, count: 0}}
}
func (c *HistogramChunk) Reset(stream []byte) {
c.b.Reset(stream)
}
// Encoding returns the encoding type.
func (c *HistogramChunk) Encoding() Encoding {
return EncHistogram

View File

@ -66,6 +66,10 @@ func NewXORChunk() *XORChunk {
return &XORChunk{b: bstream{stream: b, count: 0}}
}
func (c *XORChunk) Reset(stream []byte) {
c.b.Reset(stream)
}
// Encoding returns the encoding type.
func (c *XORChunk) Encoding() Encoding {
return EncXOR
@ -171,7 +175,6 @@ func (a *xorAppender) Append(t int64, v float64) {
}
a.writeVDelta(v)
default:
tDelta = uint64(t - a.t)
dod := int64(tDelta - a.tDelta)