diff --git a/chunks/chunk.go b/chunks/chunk.go index ee0b4c597..75f2a5c62 100644 --- a/chunks/chunk.go +++ b/chunks/chunk.go @@ -2,7 +2,6 @@ package chunks import ( "encoding/binary" - "errors" "fmt" ) @@ -25,12 +24,6 @@ const ( EncXOR ) -var ( - // ErrChunkFull is returned if the remaining size of a chunk cannot - // fit the appended data. - ErrChunkFull = errors.New("chunk full") -) - // Chunk holds a sequence of sample pairs that can be iterated over and appended to. type Chunk interface { Bytes() []byte @@ -51,31 +44,14 @@ func FromData(e Encoding, d []byte) (Chunk, error) { return nil, fmt.Errorf("unknown chunk encoding: %d", e) } -// Iterator provides iterating access over sample pairs in chunks. -type Iterator interface { - StreamingIterator - - // Seek(t int64) bool - // SeekBefore(t int64) bool - // Next() bool - // Values() (int64, float64) - // Err() error -} - // Appender adds sample pairs to a chunk. type Appender interface { - Append(int64, float64) error + Append(int64, float64) } -// StreamingIterator is a simple iterator that can only get the next value. -type StreamingIterator interface { +// Iterator is a simple iterator that can only get the next value. +type Iterator interface { Values() (int64, float64) Err() error Next() bool } - -// fancyIterator wraps a StreamingIterator and implements a regular -// Iterator with it. -type fancyIterator struct { - StreamingIterator -} diff --git a/chunks/chunk_test.go b/chunks/chunk_test.go index 1fde02a6f..d7a1698de 100644 --- a/chunks/chunk_test.go +++ b/chunks/chunk_test.go @@ -16,12 +16,12 @@ type pair struct { } func TestChunk(t *testing.T) { - for enc, nc := range map[Encoding]func(sz int) Chunk{ - EncXOR: func(sz int) Chunk { return NewXORChunk(sz) }, + for enc, nc := range map[Encoding]func() Chunk{ + EncXOR: func() Chunk { return NewXORChunk() }, } { t.Run(fmt.Sprintf("%s", enc), func(t *testing.T) { for range make([]struct{}, 1) { - c := nc(rand.Intn(1024)) + c := nc() if err := testChunk(c); err != nil { t.Fatal(err) } @@ -59,13 +59,7 @@ func testChunk(c Chunk) error { } } - err = app.Append(ts, v) - if err != nil { - if err == ErrChunkFull { - break - } - return err - } + app.Append(ts, v) exp = append(exp, pair{t: ts, v: v}) // fmt.Println("appended", len(c.Bytes()), c.Bytes()) } @@ -85,7 +79,7 @@ func testChunk(c Chunk) error { return nil } -func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) { +func benchmarkIterator(b *testing.B, newChunk func() Chunk) { var ( t = int64(1234123324) v = 1243535.123 @@ -101,19 +95,20 @@ func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) { var chunks []Chunk for i := 0; i < b.N; { - c := newChunk(1024) + c := newChunk() a, err := c.Appender() if err != nil { b.Fatalf("get appender: %s", err) } + j := 0 for _, p := range exp { - if err := a.Append(p.t, p.v); err == ErrChunkFull { + if j > 250 { break - } else if err != nil { - b.Fatal(err) } + a.Append(p.t, p.v) i++ + j++ } chunks = append(chunks, c) } @@ -141,18 +136,18 @@ func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) { } func BenchmarkXORIterator(b *testing.B) { - benchmarkIterator(b, func(sz int) Chunk { - return NewXORChunk(sz) + benchmarkIterator(b, func() Chunk { + return NewXORChunk() }) } func BenchmarkXORAppender(b *testing.B) { - benchmarkAppender(b, func(sz int) Chunk { - return NewXORChunk(sz) + benchmarkAppender(b, func() Chunk { + return NewXORChunk() }) } -func benchmarkAppender(b *testing.B, newChunk func(int) Chunk) { +func benchmarkAppender(b *testing.B, newChunk func() Chunk) { var ( t = int64(1234123324) v = 1243535.123 @@ -171,19 +166,20 @@ func benchmarkAppender(b *testing.B, newChunk func(int) Chunk) { var chunks []Chunk for i := 0; i < b.N; { - c := newChunk(1024) + c := newChunk() a, err := c.Appender() if err != nil { b.Fatalf("get appender: %s", err) } + j := 0 for _, p := range exp { - if err := a.Append(p.t, p.v); err == ErrChunkFull { + if j > 250 { break - } else if err != nil { - b.Fatal(err) } + a.Append(p.t, p.v) i++ + j++ } chunks = append(chunks, c) } diff --git a/chunks/xor.go b/chunks/xor.go index 08a30ec4d..6e750c097 100644 --- a/chunks/xor.go +++ b/chunks/xor.go @@ -11,16 +11,14 @@ import ( type XORChunk struct { b *bstream num uint16 - sz int } // NewXORChunk returns a new chunk with XOR encoding of the given size. -func NewXORChunk(size int) *XORChunk { +func NewXORChunk() *XORChunk { b := make([]byte, 2, 128) return &XORChunk{ b: &bstream{stream: b, count: 0}, - sz: size, num: 0, } } @@ -78,7 +76,7 @@ func (c *XORChunk) iterator() *xorIterator { // Iterator implements the Chunk interface. func (c *XORChunk) Iterator() Iterator { - return fancyIterator{c.iterator()} + return c.iterator() } type xorAppender struct { @@ -93,9 +91,8 @@ type xorAppender struct { trailing uint8 } -func (a *xorAppender) Append(t int64, v float64) error { +func (a *xorAppender) Append(t int64, v float64) { var tDelta uint64 - l := len(a.b.bytes()) if a.c.num == 0 { buf := make([]byte, binary.MaxVarintLen64) @@ -140,19 +137,10 @@ func (a *xorAppender) Append(t int64, v float64) error { a.writeVDelta(v) } - if len(a.b.bytes()) > a.c.sz { - // If the appended data exceeded the size limit, we truncate - // the underlying data slice back to the length we started with. - a.b.stream = a.b.stream[:l] - return ErrChunkFull - } - a.t = t a.v = v a.c.num++ a.tDelta = tDelta - - return nil } func bitRange(x int64, nbits uint8) bool { diff --git a/db.go b/db.go index 2b4923984..d9382364c 100644 --- a/db.go +++ b/db.go @@ -413,23 +413,15 @@ type chunkDesc struct { app chunks.Appender // Current appender for the chunks. } -func (cd *chunkDesc) append(ts int64, v float64) (err error) { - if cd.app == nil { - cd.app, err = cd.chunk.Appender() - if err != nil { - return err - } +func (cd *chunkDesc) append(ts int64, v float64) { + if cd.numSamples == 0 { cd.firsTimestamp = ts } - if err := cd.app.Append(ts, v); err != nil { - return err - } + cd.app.Append(ts, v) cd.lastTimestamp = ts cd.lastValue = v cd.numSamples++ - - return nil } // The MultiError type implements the error interface, and contains the diff --git a/head.go b/head.go index 424446599..a58dd9af4 100644 --- a/head.go +++ b/head.go @@ -1,7 +1,6 @@ package tsdb import ( - "math" "os" "sort" "sync" @@ -51,9 +50,7 @@ func OpenHeadBlock(dir string, baseTime int64) (*HeadBlock, error) { b.create(lset.Hash(), lset) }, sample: func(s hashedSample) { - if err := b.descs[s.ref].append(s.t, s.v); err != nil { - panic(err) // TODO(fabxc): cannot actually error - } + b.descs[s.ref].append(s.t, s.v) b.stats.SampleCount++ }, }) @@ -151,9 +148,16 @@ func (h *HeadBlock) get(hash uint64, lset labels.Labels) (*chunkDesc, uint32) { } func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc { + var err error + cd := &chunkDesc{ lset: lset, - chunk: chunks.NewXORChunk(int(math.MaxInt64)), + chunk: chunks.NewXORChunk(), + } + cd.app, err = cd.chunk.Appender() + if err != nil { + // Getting an Appender for a new chunk must not panic. + panic(err) } // Index the new chunk. ref := len(h.descs) @@ -226,13 +230,8 @@ func (h *HeadBlock) appendBatch(samples []hashedSample, appended prometheus.Coun h.create(newHashes[i], s) } - var merr MultiError for _, s := range samples { - // TODO(fabxc): ensure that this won't be able to actually error in practice. - if err := h.descs[s.ref].append(s.t, s.v); err != nil { - merr.Add(err) - continue - } + h.descs[s.ref].append(s.t, s.v) appended.Inc() h.stats.SampleCount++ @@ -242,7 +241,7 @@ func (h *HeadBlock) appendBatch(samples []hashedSample, appended prometheus.Coun } } - return merr.Err() + return nil } func (h *HeadBlock) persist(p string) (int64, error) {