diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 58110c350..e1bac7f8b 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -192,6 +192,7 @@ type chunk interface { newIterator() chunkIterator marshal(io.Writer) error unmarshal(io.Reader) error + unmarshalFromBuf([]byte) encoding() chunkEncoding // values returns a channel, from which all sample values in the chunk // can be received in order. The channel is closed after the last diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index 104766d16..16950ff30 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -191,8 +191,8 @@ func (p *persistence) sanitizeSeries( return fp, false } - bytesToTrim := fi.Size() % int64(chunkLen+chunkHeaderLen) - chunksInFile := int(fi.Size()) / (chunkLen + chunkHeaderLen) + bytesToTrim := fi.Size() % int64(chunkLenWithHeader) + chunksInFile := int(fi.Size()) / chunkLenWithHeader modTime := fi.ModTime() if bytesToTrim != 0 { glog.Warningf( diff --git a/storage/local/delta.go b/storage/local/delta.go index 0e4816935..1be169feb 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -223,18 +223,20 @@ func (c deltaEncodedChunk) marshal(w io.Writer) error { // unmarshal implements chunk. func (c *deltaEncodedChunk) unmarshal(r io.Reader) error { *c = (*c)[:cap(*c)] - readBytes := 0 - for readBytes < len(*c) { - n, err := r.Read((*c)[readBytes:]) - if err != nil { - return err - } - readBytes += n + if _, err := io.ReadFull(r, *c); err != nil { + return err } *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] return nil } +// unmarshalFromBuf implements chunk. +func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) { + *c = (*c)[:cap(*c)] + copy(*c, buf) + *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] +} + // values implements chunk. func (c deltaEncodedChunk) values() <-chan *metric.SamplePair { n := c.len() diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index a1c9ad224..dcfd155ed 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -231,18 +231,20 @@ func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error { // unmarshal implements chunk. func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error { *c = (*c)[:cap(*c)] - readBytes := 0 - for readBytes < len(*c) { - n, err := r.Read((*c)[readBytes:]) - if err != nil { - return err - } - readBytes += n + if _, err := io.ReadFull(r, *c); err != nil { + return err } *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] return nil } +// unmarshalFromBuf implements chunk. +func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) { + *c = (*c)[:cap(*c)] + copy(*c, buf) + *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] +} + // values implements chunk. func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair { n := c.len() diff --git a/storage/local/persistence.go b/storage/local/persistence.go index e4b0e29c3..2cce4cf73 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -63,6 +63,8 @@ const ( chunkHeaderTypeOffset = 0 chunkHeaderFirstTimeOffset = 1 chunkHeaderLastTimeOffset = 9 + chunkLenWithHeader = chunkLen + chunkHeaderLen + chunkMaxBatchSize = 64 // How many chunks to load at most in one batch. indexingMaxBatchSize = 1024 * 1024 indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long. @@ -122,6 +124,8 @@ type persistence struct { fLock flock.Releaser // The file lock to protect against concurrent usage. shouldSync syncStrategy + + bufPool sync.Pool } // newPersistence returns a newly allocated persistence backed by local disk storage, ready to use. @@ -233,6 +237,10 @@ func newPersistence(basePath string, dirty, pedanticChecks bool, shouldSync sync dirtyFileName: dirtyPath, fLock: fLock, shouldSync: shouldSync, + // Create buffers of length 3*chunkLenWithHeader by default because that is still reasonably small + // and at the same time enough for many uses. The contract is to never return buffer smaller than + // that to the pool so that callers can rely on a minimum buffer size. + bufPool: sync.Pool{New: func() interface{} { return make([]byte, 0, 3*chunkLenWithHeader) }}, } if p.dirty { @@ -377,28 +385,39 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, inde defer f.Close() chunks := make([]chunk, 0, len(indexes)) - typeBuf := make([]byte, 1) - for _, idx := range indexes { - _, err := f.Seek(offsetForChunkIndex(idx+indexOffset), os.SEEK_SET) - if err != nil { + buf := p.bufPool.Get().([]byte) + defer func() { + // buf may change below, so wrap returning to the pool in a function. + // A simple 'defer p.bufPool.Put(buf)' would only return the original buf. + p.bufPool.Put(buf) + }() + + for i := 0; i < len(indexes); i++ { + // This loads chunks in batches. A batch is a streak of + // consecutive chunks, read from disk in one go. + batchSize := 1 + if _, err := f.Seek(offsetForChunkIndex(indexes[i]+indexOffset), os.SEEK_SET); err != nil { return nil, err } - n, err := f.Read(typeBuf) - if err != nil { - return nil, err + for ; batchSize < chunkMaxBatchSize && + i+1 < len(indexes) && + indexes[i]+1 == indexes[i+1]; i, batchSize = i+1, batchSize+1 { } - if n != 1 { - panic("read returned != 1 bytes") + readSize := batchSize * chunkLenWithHeader + if cap(buf) < readSize { + buf = make([]byte, readSize) } + buf = buf[:readSize] - _, err = f.Seek(chunkHeaderLen-1, os.SEEK_CUR) - if err != nil { + if _, err := io.ReadFull(f, buf); err != nil { return nil, err } - chunk := newChunkForEncoding(chunkEncoding(typeBuf[0])) - chunk.unmarshal(f) - chunks = append(chunks, chunk) + for c := 0; c < batchSize; c++ { + chunk := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) + chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]) + chunks = append(chunks, chunk) + } } chunkOps.WithLabelValues(load).Add(float64(len(chunks))) atomic.AddInt64(&numMemChunks, int64(len(chunks))) @@ -422,24 +441,23 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie if err != nil { return nil, err } - totalChunkLen := chunkHeaderLen + chunkLen - if fi.Size()%int64(totalChunkLen) != 0 { + if fi.Size()%int64(chunkLenWithHeader) != 0 { p.setDirty(true) return nil, fmt.Errorf( "size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d", - fp, fi.Size(), totalChunkLen, + fp, fi.Size(), chunkLenWithHeader, ) } - numChunks := int(fi.Size()) / totalChunkLen + numChunks := int(fi.Size()) / chunkLenWithHeader cds := make([]*chunkDesc, 0, numChunks) + chunkTimesBuf := make([]byte, 16) for i := 0; i < numChunks; i++ { _, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET) if err != nil { return nil, err } - chunkTimesBuf := make([]byte, 16) _, err = io.ReadAtLeast(f, chunkTimesBuf, 16) if err != nil { return nil, err @@ -799,7 +817,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in } chunk := newChunkForEncoding(chunkEncoding(encoding)) if err := chunk.unmarshal(r); err != nil { - glog.Warning("Could not decode chunk type:", err) + glog.Warning("Could not decode chunk:", err) p.dirty = true return sm, chunksToPersist, nil } @@ -900,7 +918,7 @@ func (p *persistence) dropAndPersistChunks( return } headerBuf := make([]byte, chunkHeaderLen) - _, err = io.ReadAtLeast(f, headerBuf, chunkHeaderLen) + _, err = io.ReadFull(f, headerBuf) if err == io.EOF { // We ran into the end of the file without finding any chunks that should // be kept. Remove the whole file. @@ -960,7 +978,7 @@ func (p *persistence) dropAndPersistChunks( if err != nil { return } - offset = int(written / (chunkHeaderLen + chunkLen)) + offset = int(written / chunkLenWithHeader) if len(chunks) > 0 { if err = writeChunks(temp, chunks); err != nil { @@ -983,7 +1001,7 @@ func (p *persistence) deleteSeriesFile(fp clientmodel.Fingerprint) (int, error) if err != nil { return -1, err } - numChunks := int(fi.Size() / (chunkHeaderLen + chunkLen)) + numChunks := int(fi.Size() / chunkLenWithHeader) if err := os.Remove(fname); err != nil { return -1, err } @@ -1366,17 +1384,17 @@ loop: } func offsetForChunkIndex(i int) int64 { - return int64(i * (chunkHeaderLen + chunkLen)) + return int64(i * chunkLenWithHeader) } func chunkIndexForOffset(offset int64) (int, error) { - if int(offset)%(chunkHeaderLen+chunkLen) != 0 { + if int(offset)%chunkLenWithHeader != 0 { return -1, fmt.Errorf( "offset %d is not a multiple of on-disk chunk length %d", - offset, chunkHeaderLen+chunkLen, + offset, chunkLenWithHeader, ) } - return int(offset) / (chunkHeaderLen + chunkLen), nil + return int(offset) / chunkLenWithHeader, nil } func writeChunkHeader(w io.Writer, c chunk) error { @@ -1389,7 +1407,7 @@ func writeChunkHeader(w io.Writer, c chunk) error { } func writeChunks(w io.Writer, chunks []chunk) error { - b := bufio.NewWriterSize(w, len(chunks)*(chunkHeaderLen+chunkLen)) + b := bufio.NewWriterSize(w, len(chunks)*chunkLenWithHeader) for _, chunk := range chunks { if err := writeChunkHeader(b, chunk); err != nil { return err diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index d6b75d2ee..fc21b8d24 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -15,6 +15,7 @@ package local import ( "reflect" + "sync" "testing" clientmodel "github.com/prometheus/client_golang/model" @@ -894,6 +895,7 @@ var fpStrings = []string{ func BenchmarkLoadChunksSequentially(b *testing.B) { p := persistence{ basePath: "fixtures", + bufPool: sync.Pool{New: func() interface{} { return make([]byte, 0, 3*chunkLenWithHeader) }}, } sequentialIndexes := make([]int, 47) for i := range sequentialIndexes { @@ -918,6 +920,7 @@ func BenchmarkLoadChunksSequentially(b *testing.B) { func BenchmarkLoadChunksRandomly(b *testing.B) { p := persistence{ basePath: "fixtures", + bufPool: sync.Pool{New: func() interface{} { return make([]byte, 0, 3*chunkLenWithHeader) }}, } randomIndexes := []int{1, 5, 6, 8, 11, 14, 18, 23, 29, 33, 42, 46} diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 2f53b77ee..6ebbb1a36 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -625,7 +625,6 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if archived { t.Fatal("archived") } - fmt.Println(series.headChunkClosed, len(series.chunkDescs)) // This will archive again, but must not drop it completely, despite the // memorySeries being empty.