Merge pull request #635 from prometheus/beorn7/persistence

Improve chunk and chunkDesc loading.
pull/640/head
Björn Rabenstein 2015-04-14 16:11:24 +02:00
commit ad7ef406d3
7 changed files with 70 additions and 45 deletions

View File

@ -192,6 +192,7 @@ type chunk interface {
newIterator() chunkIterator newIterator() chunkIterator
marshal(io.Writer) error marshal(io.Writer) error
unmarshal(io.Reader) error unmarshal(io.Reader) error
unmarshalFromBuf([]byte)
encoding() chunkEncoding encoding() chunkEncoding
// values returns a channel, from which all sample values in the chunk // values returns a channel, from which all sample values in the chunk
// can be received in order. The channel is closed after the last // can be received in order. The channel is closed after the last

View File

@ -191,8 +191,8 @@ func (p *persistence) sanitizeSeries(
return fp, false return fp, false
} }
bytesToTrim := fi.Size() % int64(chunkLen+chunkHeaderLen) bytesToTrim := fi.Size() % int64(chunkLenWithHeader)
chunksInFile := int(fi.Size()) / (chunkLen + chunkHeaderLen) chunksInFile := int(fi.Size()) / chunkLenWithHeader
modTime := fi.ModTime() modTime := fi.ModTime()
if bytesToTrim != 0 { if bytesToTrim != 0 {
glog.Warningf( glog.Warningf(

View File

@ -223,18 +223,20 @@ func (c deltaEncodedChunk) marshal(w io.Writer) error {
// unmarshal implements chunk. // unmarshal implements chunk.
func (c *deltaEncodedChunk) unmarshal(r io.Reader) error { func (c *deltaEncodedChunk) unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)] *c = (*c)[:cap(*c)]
readBytes := 0 if _, err := io.ReadFull(r, *c); err != nil {
for readBytes < len(*c) { return err
n, err := r.Read((*c)[readBytes:])
if err != nil {
return err
}
readBytes += n
} }
*c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])]
return nil return nil
} }
// unmarshalFromBuf implements chunk.
func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) {
*c = (*c)[:cap(*c)]
copy(*c, buf)
*c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])]
}
// values implements chunk. // values implements chunk.
func (c deltaEncodedChunk) values() <-chan *metric.SamplePair { func (c deltaEncodedChunk) values() <-chan *metric.SamplePair {
n := c.len() n := c.len()

View File

@ -231,18 +231,20 @@ func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error {
// unmarshal implements chunk. // unmarshal implements chunk.
func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error { func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)] *c = (*c)[:cap(*c)]
readBytes := 0 if _, err := io.ReadFull(r, *c); err != nil {
for readBytes < len(*c) { return err
n, err := r.Read((*c)[readBytes:])
if err != nil {
return err
}
readBytes += n
} }
*c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])]
return nil return nil
} }
// unmarshalFromBuf implements chunk.
func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) {
*c = (*c)[:cap(*c)]
copy(*c, buf)
*c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])]
}
// values implements chunk. // values implements chunk.
func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair { func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair {
n := c.len() n := c.len()

View File

@ -63,6 +63,8 @@ const (
chunkHeaderTypeOffset = 0 chunkHeaderTypeOffset = 0
chunkHeaderFirstTimeOffset = 1 chunkHeaderFirstTimeOffset = 1
chunkHeaderLastTimeOffset = 9 chunkHeaderLastTimeOffset = 9
chunkLenWithHeader = chunkLen + chunkHeaderLen
chunkMaxBatchSize = 64 // How many chunks to load at most in one batch.
indexingMaxBatchSize = 1024 * 1024 indexingMaxBatchSize = 1024 * 1024
indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long. indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long.
@ -122,6 +124,8 @@ type persistence struct {
fLock flock.Releaser // The file lock to protect against concurrent usage. fLock flock.Releaser // The file lock to protect against concurrent usage.
shouldSync syncStrategy shouldSync syncStrategy
bufPool sync.Pool
} }
// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use. // newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
@ -233,6 +237,10 @@ func newPersistence(basePath string, dirty, pedanticChecks bool, shouldSync sync
dirtyFileName: dirtyPath, dirtyFileName: dirtyPath,
fLock: fLock, fLock: fLock,
shouldSync: shouldSync, shouldSync: shouldSync,
// Create buffers of length 3*chunkLenWithHeader by default because that is still reasonably small
// and at the same time enough for many uses. The contract is to never return buffer smaller than
// that to the pool so that callers can rely on a minimum buffer size.
bufPool: sync.Pool{New: func() interface{} { return make([]byte, 0, 3*chunkLenWithHeader) }},
} }
if p.dirty { if p.dirty {
@ -377,28 +385,39 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, inde
defer f.Close() defer f.Close()
chunks := make([]chunk, 0, len(indexes)) chunks := make([]chunk, 0, len(indexes))
typeBuf := make([]byte, 1) buf := p.bufPool.Get().([]byte)
for _, idx := range indexes { defer func() {
_, err := f.Seek(offsetForChunkIndex(idx+indexOffset), os.SEEK_SET) // buf may change below, so wrap returning to the pool in a function.
if err != nil { // A simple 'defer p.bufPool.Put(buf)' would only return the original buf.
p.bufPool.Put(buf)
}()
for i := 0; i < len(indexes); i++ {
// This loads chunks in batches. A batch is a streak of
// consecutive chunks, read from disk in one go.
batchSize := 1
if _, err := f.Seek(offsetForChunkIndex(indexes[i]+indexOffset), os.SEEK_SET); err != nil {
return nil, err return nil, err
} }
n, err := f.Read(typeBuf) for ; batchSize < chunkMaxBatchSize &&
if err != nil { i+1 < len(indexes) &&
return nil, err indexes[i]+1 == indexes[i+1]; i, batchSize = i+1, batchSize+1 {
} }
if n != 1 { readSize := batchSize * chunkLenWithHeader
panic("read returned != 1 bytes") if cap(buf) < readSize {
buf = make([]byte, readSize)
} }
buf = buf[:readSize]
_, err = f.Seek(chunkHeaderLen-1, os.SEEK_CUR) if _, err := io.ReadFull(f, buf); err != nil {
if err != nil {
return nil, err return nil, err
} }
chunk := newChunkForEncoding(chunkEncoding(typeBuf[0])) for c := 0; c < batchSize; c++ {
chunk.unmarshal(f) chunk := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset]))
chunks = append(chunks, chunk) chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:])
chunks = append(chunks, chunk)
}
} }
chunkOps.WithLabelValues(load).Add(float64(len(chunks))) chunkOps.WithLabelValues(load).Add(float64(len(chunks)))
atomic.AddInt64(&numMemChunks, int64(len(chunks))) atomic.AddInt64(&numMemChunks, int64(len(chunks)))
@ -422,24 +441,23 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
if err != nil { if err != nil {
return nil, err return nil, err
} }
totalChunkLen := chunkHeaderLen + chunkLen if fi.Size()%int64(chunkLenWithHeader) != 0 {
if fi.Size()%int64(totalChunkLen) != 0 {
p.setDirty(true) p.setDirty(true)
return nil, fmt.Errorf( return nil, fmt.Errorf(
"size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d", "size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d",
fp, fi.Size(), totalChunkLen, fp, fi.Size(), chunkLenWithHeader,
) )
} }
numChunks := int(fi.Size()) / totalChunkLen numChunks := int(fi.Size()) / chunkLenWithHeader
cds := make([]*chunkDesc, 0, numChunks) cds := make([]*chunkDesc, 0, numChunks)
chunkTimesBuf := make([]byte, 16)
for i := 0; i < numChunks; i++ { for i := 0; i < numChunks; i++ {
_, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET) _, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET)
if err != nil { if err != nil {
return nil, err return nil, err
} }
chunkTimesBuf := make([]byte, 16)
_, err = io.ReadAtLeast(f, chunkTimesBuf, 16) _, err = io.ReadAtLeast(f, chunkTimesBuf, 16)
if err != nil { if err != nil {
return nil, err return nil, err
@ -799,7 +817,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
} }
chunk := newChunkForEncoding(chunkEncoding(encoding)) chunk := newChunkForEncoding(chunkEncoding(encoding))
if err := chunk.unmarshal(r); err != nil { if err := chunk.unmarshal(r); err != nil {
glog.Warning("Could not decode chunk type:", err) glog.Warning("Could not decode chunk:", err)
p.dirty = true p.dirty = true
return sm, chunksToPersist, nil return sm, chunksToPersist, nil
} }
@ -900,7 +918,7 @@ func (p *persistence) dropAndPersistChunks(
return return
} }
headerBuf := make([]byte, chunkHeaderLen) headerBuf := make([]byte, chunkHeaderLen)
_, err = io.ReadAtLeast(f, headerBuf, chunkHeaderLen) _, err = io.ReadFull(f, headerBuf)
if err == io.EOF { if err == io.EOF {
// We ran into the end of the file without finding any chunks that should // We ran into the end of the file without finding any chunks that should
// be kept. Remove the whole file. // be kept. Remove the whole file.
@ -960,7 +978,7 @@ func (p *persistence) dropAndPersistChunks(
if err != nil { if err != nil {
return return
} }
offset = int(written / (chunkHeaderLen + chunkLen)) offset = int(written / chunkLenWithHeader)
if len(chunks) > 0 { if len(chunks) > 0 {
if err = writeChunks(temp, chunks); err != nil { if err = writeChunks(temp, chunks); err != nil {
@ -983,7 +1001,7 @@ func (p *persistence) deleteSeriesFile(fp clientmodel.Fingerprint) (int, error)
if err != nil { if err != nil {
return -1, err return -1, err
} }
numChunks := int(fi.Size() / (chunkHeaderLen + chunkLen)) numChunks := int(fi.Size() / chunkLenWithHeader)
if err := os.Remove(fname); err != nil { if err := os.Remove(fname); err != nil {
return -1, err return -1, err
} }
@ -1366,17 +1384,17 @@ loop:
} }
func offsetForChunkIndex(i int) int64 { func offsetForChunkIndex(i int) int64 {
return int64(i * (chunkHeaderLen + chunkLen)) return int64(i * chunkLenWithHeader)
} }
func chunkIndexForOffset(offset int64) (int, error) { func chunkIndexForOffset(offset int64) (int, error) {
if int(offset)%(chunkHeaderLen+chunkLen) != 0 { if int(offset)%chunkLenWithHeader != 0 {
return -1, fmt.Errorf( return -1, fmt.Errorf(
"offset %d is not a multiple of on-disk chunk length %d", "offset %d is not a multiple of on-disk chunk length %d",
offset, chunkHeaderLen+chunkLen, offset, chunkLenWithHeader,
) )
} }
return int(offset) / (chunkHeaderLen + chunkLen), nil return int(offset) / chunkLenWithHeader, nil
} }
func writeChunkHeader(w io.Writer, c chunk) error { func writeChunkHeader(w io.Writer, c chunk) error {
@ -1389,7 +1407,7 @@ func writeChunkHeader(w io.Writer, c chunk) error {
} }
func writeChunks(w io.Writer, chunks []chunk) error { func writeChunks(w io.Writer, chunks []chunk) error {
b := bufio.NewWriterSize(w, len(chunks)*(chunkHeaderLen+chunkLen)) b := bufio.NewWriterSize(w, len(chunks)*chunkLenWithHeader)
for _, chunk := range chunks { for _, chunk := range chunks {
if err := writeChunkHeader(b, chunk); err != nil { if err := writeChunkHeader(b, chunk); err != nil {
return err return err

View File

@ -15,6 +15,7 @@ package local
import ( import (
"reflect" "reflect"
"sync"
"testing" "testing"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
@ -894,6 +895,7 @@ var fpStrings = []string{
func BenchmarkLoadChunksSequentially(b *testing.B) { func BenchmarkLoadChunksSequentially(b *testing.B) {
p := persistence{ p := persistence{
basePath: "fixtures", basePath: "fixtures",
bufPool: sync.Pool{New: func() interface{} { return make([]byte, 0, 3*chunkLenWithHeader) }},
} }
sequentialIndexes := make([]int, 47) sequentialIndexes := make([]int, 47)
for i := range sequentialIndexes { for i := range sequentialIndexes {
@ -918,6 +920,7 @@ func BenchmarkLoadChunksSequentially(b *testing.B) {
func BenchmarkLoadChunksRandomly(b *testing.B) { func BenchmarkLoadChunksRandomly(b *testing.B) {
p := persistence{ p := persistence{
basePath: "fixtures", basePath: "fixtures",
bufPool: sync.Pool{New: func() interface{} { return make([]byte, 0, 3*chunkLenWithHeader) }},
} }
randomIndexes := []int{1, 5, 6, 8, 11, 14, 18, 23, 29, 33, 42, 46} randomIndexes := []int{1, 5, 6, 8, 11, 14, 18, 23, 29, 33, 42, 46}

View File

@ -625,7 +625,6 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
if archived { if archived {
t.Fatal("archived") t.Fatal("archived")
} }
fmt.Println(series.headChunkClosed, len(series.chunkDescs))
// This will archive again, but must not drop it completely, despite the // This will archive again, but must not drop it completely, despite the
// memorySeries being empty. // memorySeries being empty.