Merge pull request #14043 from aknuds1/arve/chunkenc-refactor

tsdb/chunkenc.Pool: Refactor Get and Put
pull/14068/head
Björn Rabenstein 2024-05-08 15:03:18 +02:00 committed by GitHub
commit 178935d0e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 150 additions and 44 deletions

View File

@ -52,6 +52,12 @@ type bstream struct {
count uint8 // How many right-most bits are available for writing in the current byte (the last byte of the stream). count uint8 // How many right-most bits are available for writing in the current byte (the last byte of the stream).
} }
// Reset resets b around stream.
func (b *bstream) Reset(stream []byte) {
b.stream = stream
b.count = 0
}
func (b *bstream) bytes() []byte { func (b *bstream) bytes() []byte {
return b.stream return b.stream
} }

View File

@ -19,6 +19,19 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestBstream_Reset(t *testing.T) {
bs := bstream{
stream: []byte("test"),
count: 10,
}
bs.Reset([]byte("was reset"))
require.Equal(t, bstream{
stream: []byte("was reset"),
count: 0,
}, bs)
}
func TestBstreamReader(t *testing.T) { func TestBstreamReader(t *testing.T) {
// Write to the bit stream. // Write to the bit stream.
w := bstream{} w := bstream{}

View File

@ -87,6 +87,9 @@ type Chunk interface {
// There's no strong guarantee that no samples will be appended once // There's no strong guarantee that no samples will be appended once
// Compact() is called. Implementing this function is optional. // Compact() is called. Implementing this function is optional.
Compact() Compact()
// Reset resets the chunk given stream.
Reset(stream []byte)
} }
type Iterable interface { type Iterable interface {
@ -303,64 +306,47 @@ func NewPool() Pool {
} }
func (p *pool) Get(e Encoding, b []byte) (Chunk, error) { func (p *pool) Get(e Encoding, b []byte) (Chunk, error) {
var c Chunk
switch e { switch e {
case EncXOR: case EncXOR:
c := p.xor.Get().(*XORChunk) c = p.xor.Get().(*XORChunk)
c.b.stream = b
c.b.count = 0
return c, nil
case EncHistogram: case EncHistogram:
c := p.histogram.Get().(*HistogramChunk) c = p.histogram.Get().(*HistogramChunk)
c.b.stream = b
c.b.count = 0
return c, nil
case EncFloatHistogram: case EncFloatHistogram:
c := p.floatHistogram.Get().(*FloatHistogramChunk) c = p.floatHistogram.Get().(*FloatHistogramChunk)
c.b.stream = b default:
c.b.count = 0 return nil, fmt.Errorf("invalid chunk encoding %q", e)
return c, nil
} }
return nil, fmt.Errorf("invalid chunk encoding %q", e)
c.Reset(b)
return c, nil
} }
func (p *pool) Put(c Chunk) error { func (p *pool) Put(c Chunk) error {
var sp *sync.Pool
var ok bool
switch c.Encoding() { switch c.Encoding() {
case EncXOR: case EncXOR:
xc, ok := c.(*XORChunk) _, ok = c.(*XORChunk)
// This may happen often with wrapped chunks. Nothing we can really do about sp = &p.xor
// it but returning an error would cause a lot of allocations again. Thus,
// we just skip it.
if !ok {
return nil
}
xc.b.stream = nil
xc.b.count = 0
p.xor.Put(c)
case EncHistogram: case EncHistogram:
sh, ok := c.(*HistogramChunk) _, ok = c.(*HistogramChunk)
// This may happen often with wrapped chunks. Nothing we can really do about sp = &p.histogram
// it but returning an error would cause a lot of allocations again. Thus,
// we just skip it.
if !ok {
return nil
}
sh.b.stream = nil
sh.b.count = 0
p.histogram.Put(c)
case EncFloatHistogram: case EncFloatHistogram:
sh, ok := c.(*FloatHistogramChunk) _, ok = c.(*FloatHistogramChunk)
// This may happen often with wrapped chunks. Nothing we can really do about sp = &p.floatHistogram
// it but returning an error would cause a lot of allocations again. Thus,
// we just skip it.
if !ok {
return nil
}
sh.b.stream = nil
sh.b.count = 0
p.floatHistogram.Put(c)
default: default:
return fmt.Errorf("invalid chunk encoding %q", c.Encoding()) return fmt.Errorf("invalid chunk encoding %q", c.Encoding())
} }
if !ok {
// This may happen often with wrapped chunks. Nothing we can really do about
// it but returning an error would cause a lot of allocations again. Thus,
// we just skip it.
return nil
}
c.Reset(nil)
sp.Put(c)
return nil return nil
} }

View File

@ -110,6 +110,96 @@ func testChunk(t *testing.T, c Chunk) {
require.Equal(t, ValNone, it3.Seek(exp[len(exp)-1].t+1)) require.Equal(t, ValNone, it3.Seek(exp[len(exp)-1].t+1))
} }
func TestPool(t *testing.T) {
p := NewPool()
for _, tc := range []struct {
name string
encoding Encoding
expErr error
}{
{
name: "xor",
encoding: EncXOR,
},
{
name: "histogram",
encoding: EncHistogram,
},
{
name: "float histogram",
encoding: EncFloatHistogram,
},
{
name: "invalid encoding",
encoding: EncNone,
expErr: fmt.Errorf(`invalid chunk encoding "none"`),
},
} {
t.Run(tc.name, func(t *testing.T) {
c, err := p.Get(tc.encoding, []byte("test"))
if tc.expErr != nil {
require.EqualError(t, err, tc.expErr.Error())
return
}
require.NoError(t, err)
var b *bstream
switch tc.encoding {
case EncHistogram:
b = &c.(*HistogramChunk).b
case EncFloatHistogram:
b = &c.(*FloatHistogramChunk).b
default:
b = &c.(*XORChunk).b
}
require.Equal(t, &bstream{
stream: []byte("test"),
count: 0,
}, b)
b.count = 1
require.NoError(t, p.Put(c))
require.Equal(t, &bstream{
stream: nil,
count: 0,
}, b)
})
}
t.Run("put bad chunk wrapper", func(t *testing.T) {
// When a wrapping chunk poses as an encoding it can't be converted to, Put should skip it.
c := fakeChunk{
encoding: EncXOR,
t: t,
}
require.NoError(t, p.Put(c))
})
t.Run("put invalid encoding", func(t *testing.T) {
c := fakeChunk{
encoding: EncNone,
t: t,
}
require.EqualError(t, p.Put(c), `invalid chunk encoding "none"`)
})
}
type fakeChunk struct {
Chunk
encoding Encoding
t *testing.T
}
func (c fakeChunk) Encoding() Encoding {
return c.encoding
}
func (c fakeChunk) Reset([]byte) {
c.t.Fatal("Reset should not be called")
}
func benchmarkIterator(b *testing.B, newChunk func() Chunk) { func benchmarkIterator(b *testing.B, newChunk func() Chunk) {
const samplesPerChunk = 250 const samplesPerChunk = 250
var ( var (

View File

@ -44,6 +44,10 @@ func NewFloatHistogramChunk() *FloatHistogramChunk {
return &FloatHistogramChunk{b: bstream{stream: b, count: 0}} return &FloatHistogramChunk{b: bstream{stream: b, count: 0}}
} }
func (c *FloatHistogramChunk) Reset(stream []byte) {
c.b.Reset(stream)
}
// xorValue holds all the necessary information to encode // xorValue holds all the necessary information to encode
// and decode XOR encoded float64 values. // and decode XOR encoded float64 values.
type xorValue struct { type xorValue struct {

View File

@ -45,6 +45,10 @@ func NewHistogramChunk() *HistogramChunk {
return &HistogramChunk{b: bstream{stream: b, count: 0}} return &HistogramChunk{b: bstream{stream: b, count: 0}}
} }
func (c *HistogramChunk) Reset(stream []byte) {
c.b.Reset(stream)
}
// Encoding returns the encoding type. // Encoding returns the encoding type.
func (c *HistogramChunk) Encoding() Encoding { func (c *HistogramChunk) Encoding() Encoding {
return EncHistogram return EncHistogram

View File

@ -66,6 +66,10 @@ func NewXORChunk() *XORChunk {
return &XORChunk{b: bstream{stream: b, count: 0}} return &XORChunk{b: bstream{stream: b, count: 0}}
} }
func (c *XORChunk) Reset(stream []byte) {
c.b.Reset(stream)
}
// Encoding returns the encoding type. // Encoding returns the encoding type.
func (c *XORChunk) Encoding() Encoding { func (c *XORChunk) Encoding() Encoding {
return EncXOR return EncXOR
@ -171,7 +175,6 @@ func (a *xorAppender) Append(t int64, v float64) {
} }
a.writeVDelta(v) a.writeVDelta(v)
default: default:
tDelta = uint64(t - a.t) tDelta = uint64(t - a.t)
dod := int64(tDelta - a.tDelta) dod := int64(tDelta - a.tDelta)