Improve persisting chunks to disk.

This is done by bucketing chunks by fingerprint. If the persisting to
disk falls behind, more and more chunks are in the queue. As soon as
there are "double hits", we will now persist both chunks in one go,
doubling the disk throughput (assuming it is limited by disk
seeks). Should even more pile up so that we end wit "triple hits", we
will persist those first, and so on.

Even if we have millions of time series, this will still help,
assuming not all of them are growing with the same speed. Series that
get many samples and/or are not very compressable will accumulate
chunks faster, and they will soon get double- or triple-writes.

To improve the chance of double writes,
-storage.local.persistence-queue-capacity could be set to a higher
value. However, that will slow down shutdown a lot (as the queue has
to be worked through). So we leave it to the user to set it to a
really high value. A more fundamental solution would be to checkpoint
not only head chunks, but also chunks still in the persist queue. That
would be quite complicated for a rather limited use-case (running many
time series with high ingestion rate on slow spinning disks).
pull/532/head
beorn7 10 years ago
parent 498e1b5154
commit af91fb8e31

@ -57,7 +57,7 @@ var (
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
persistenceQueueCapacity = flag.Int("storage.local.persistence-queue-capacity", 128*1024, "How many chunks can be waiting for being persisted before sample ingestion will stop.")
persistenceQueueCapacity = flag.Int("storage.local.persistence-queue-capacity", 32*1024, "How many chunks can be waiting for being persisted before sample ingestion will stop.")
checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.")
checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.")

@ -84,7 +84,7 @@ type indexingOp struct {
// A Persistence is used by a Storage implementation to store samples
// persistently across restarts. The methods are only goroutine-safe if
// explicitly marked as such below. The chunk-related methods persistChunk,
// explicitly marked as such below. The chunk-related methods persistChunks,
// dropChunks, loadChunks, and loadChunkDescs can be called concurrently with
// each other if each call refers to a different fingerprint.
type persistence struct {
@ -283,35 +283,35 @@ func (p *persistence) getLabelValuesForLabelName(ln clientmodel.LabelName) (clie
return lvs, nil
}
// persistChunk persists a single chunk of a series. It is the caller's
// responsibility to not modify the chunk concurrently and to not persist or
// drop anything for the same fingerprint concurrently. It returns the
// (zero-based) index of the persisted chunk within the series file. In case of
// an error, the returned index is -1 (to avoid the misconception that the chunk
// was written at position 0).
func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) (int, error) {
// 1. Open chunk file.
// persistChunks persists a number of consecutive chunks of a series. It is the
// caller's responsibility to not modify the chunks concurrently and to not
// persist or drop anything for the same fingerprint concurrently. It returns
// the (zero-based) index of the first persisted chunk within the series
// file. In case of an error, the returned index is -1 (to avoid the
// misconception that the chunk was written at position 0).
func (p *persistence) persistChunks(fp clientmodel.Fingerprint, chunks []chunk) (int, error) {
f, err := p.openChunkFileForWriting(fp)
if err != nil {
return -1, err
}
defer f.Close()
b := bufio.NewWriterSize(f, chunkHeaderLen+p.chunkLen)
b := bufio.NewWriterSize(f, len(chunks)*(chunkHeaderLen+p.chunkLen))
// 2. Write the header (chunk type and first/last times).
err = writeChunkHeader(b, c)
if err != nil {
return -1, err
}
for _, c := range chunks {
err = writeChunkHeader(b, c)
if err != nil {
return -1, err
}
// 3. Write chunk into file.
err = c.marshal(b)
if err != nil {
return -1, err
err = c.marshal(b)
if err != nil {
return -1, err
}
}
// 4. Determine index within the file.
// Determine index within the file.
b.Flush()
offset, err := f.Seek(0, os.SEEK_CUR)
if err != nil {
@ -322,7 +322,7 @@ func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) (int, er
return -1, err
}
return index - 1, err
return index - len(chunks), err
}
// loadChunks loads a group of chunks of a timeseries by their index. The chunk

@ -83,7 +83,7 @@ func TestPersistLoadDropChunks(t *testing.T) {
for fp, chunks := range fpToChunks {
for i, c := range chunks {
index, err := p.persistChunk(fp, c)
index, err := p.persistChunks(fp, []chunk{c})
if err != nil {
t.Fatal(err)
}

@ -76,9 +76,11 @@ type memorySeriesStorage struct {
appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue.
appendWaitGroup sync.WaitGroup // To wait for all appended samples to be processed.
persistQueue chan persistRequest
persistStopped chan struct{}
persistence *persistence
persistQueue chan persistRequest
persistQueueCap int // Not actually the cap of above channel. See handlePersistQueue.
persistStopped chan struct{}
persistence *persistence
countPersistedHeadChunks chan struct{}
@ -145,9 +147,13 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
appendLastTimestamp: clientmodel.Earliest,
appendQueue: make(chan *clientmodel.Sample, appendQueueCap),
persistQueue: make(chan persistRequest, o.PersistenceQueueCapacity),
persistStopped: make(chan struct{}),
persistence: p,
// The actual buffering happens within handlePersistQueue, so
// cap of persistQueue just has to be enough to not block while
// handlePersistQueue is writing to disk (20ms or so).
persistQueue: make(chan persistRequest, 1024),
persistQueueCap: o.PersistenceQueueCapacity,
persistStopped: make(chan struct{}),
persistence: p,
countPersistedHeadChunks: make(chan struct{}, 1024),
@ -568,32 +574,100 @@ func (s *memorySeriesStorage) maybeEvict() {
}
func (s *memorySeriesStorage) handlePersistQueue() {
for req := range s.persistQueue {
s.persistQueueLength.Set(float64(len(s.persistQueue)))
start := time.Now()
s.fpLocker.Lock(req.fingerprint)
offset, err := s.persistence.persistChunk(req.fingerprint, req.chunkDesc.chunk)
if series, seriesInMemory := s.fpToSeries.get(req.fingerprint); err == nil && seriesInMemory && series.chunkDescsOffset == -1 {
// This is the first chunk persisted for a newly created
// series that had prior chunks on disk. Finally, we can
// set the chunkDescsOffset.
series.chunkDescsOffset = offset
}
s.fpLocker.Unlock(req.fingerprint)
s.persistLatency.Observe(float64(time.Since(start)) / float64(time.Microsecond))
if err != nil {
s.persistErrors.Inc()
glog.Error("Error persisting chunk: ", err)
s.persistence.setDirty(true)
continue
chunkMaps := chunkMaps{}
chunkCount := 0
persistMostConsecutiveChunks := func() {
fp, cds := chunkMaps.pop()
if err := s.persistChunks(fp, cds); err != nil {
// Need to put chunks back for retry.
for _, cd := range cds {
chunkMaps.add(fp, cd)
}
return
}
req.chunkDesc.unpin(s.evictRequests)
chunkOps.WithLabelValues(persistAndUnpin).Inc()
chunkCount -= len(cds)
}
loop:
for {
if chunkCount >= s.persistQueueCap && chunkCount > 0 {
glog.Warningf("%d chunks queued for persistence. Ingestion pipeline will backlog.", chunkCount)
persistMostConsecutiveChunks()
}
select {
case req, ok := <-s.persistQueue:
if !ok {
break loop
}
chunkMaps.add(req.fingerprint, req.chunkDesc)
chunkCount++
default:
if chunkCount > 0 {
persistMostConsecutiveChunks()
continue loop
}
// If we are here, there is nothing to do right now. So
// just wait for a persist request to come in.
req, ok := <-s.persistQueue
if !ok {
break loop
}
chunkMaps.add(req.fingerprint, req.chunkDesc)
chunkCount++
}
s.persistQueueLength.Set(float64(chunkCount))
}
// Drain all requests.
for _, m := range chunkMaps {
for fp, cds := range m {
if s.persistChunks(fp, cds) == nil {
chunkCount -= len(cds)
if (chunkCount+len(cds))/1000 > chunkCount/1000 {
glog.Infof(
"Still draining persist queue, %d chunks left to persist...",
chunkCount,
)
}
s.persistQueueLength.Set(float64(chunkCount))
}
}
}
glog.Info("Persist queue drained and stopped.")
close(s.persistStopped)
}
func (s *memorySeriesStorage) persistChunks(fp clientmodel.Fingerprint, cds []*chunkDesc) error {
start := time.Now()
chunks := make([]chunk, len(cds))
for i, cd := range cds {
chunks[i] = cd.chunk
}
s.fpLocker.Lock(fp)
offset, err := s.persistence.persistChunks(fp, chunks)
if series, seriesInMemory := s.fpToSeries.get(fp); err == nil && seriesInMemory && series.chunkDescsOffset == -1 {
// This is the first chunk persisted for a newly created
// series that had prior chunks on disk. Finally, we can
// set the chunkDescsOffset.
series.chunkDescsOffset = offset
}
s.fpLocker.Unlock(fp)
s.persistLatency.Observe(float64(time.Since(start)) / float64(time.Microsecond))
if err != nil {
s.persistErrors.Inc()
glog.Error("Error persisting chunks: ", err)
s.persistence.setDirty(true)
return err
}
for _, cd := range cds {
cd.unpin(s.evictRequests)
}
chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds)))
return nil
}
// waitForNextFP waits an estimated duration, after which we want to process
// another fingerprint so that we will process all fingerprints in a tenth of
// s.purgeAfter assuming that the system is doing nothing else, e.g. if we want
@ -927,3 +1001,52 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
count := atomic.LoadInt64(&numMemChunks)
ch <- prometheus.MustNewConstMetric(numMemChunksDesc, prometheus.GaugeValue, float64(count))
}
// chunkMaps is a slice of maps with chunkDescs to be persisted.
// Each chunk map contains n consecutive chunks to persist, where
// n is the index+1.
type chunkMaps []map[clientmodel.Fingerprint][]*chunkDesc
// add adds a chunk to chunkMaps.
func (cm *chunkMaps) add(fp clientmodel.Fingerprint, cd *chunkDesc) {
// Runtime of this method is linear with the number of
// chunkMaps. However, we expect only ever very few maps.
numMaps := len(*cm)
for i, m := range *cm {
if cds, ok := m[fp]; ok {
// Found our fp! Add cd and level up.
cds = append(cds, cd)
delete(m, fp)
if i == numMaps-1 {
*cm = append(*cm, map[clientmodel.Fingerprint][]*chunkDesc{})
}
(*cm)[i+1][fp] = cds
return
}
}
// Our fp isn't contained in cm yet. Add it to the first map (and add a
// first map if there is none).
if numMaps == 0 {
*cm = chunkMaps{map[clientmodel.Fingerprint][]*chunkDesc{}}
}
(*cm)[0][fp] = []*chunkDesc{cd}
}
// pop retrieves and removes a fingerprint with all its chunks. It chooses one
// of the fingerprints with the most chunks. It panics if cm has no entries.
func (cm *chunkMaps) pop() (clientmodel.Fingerprint, []*chunkDesc) {
m := (*cm)[len(*cm)-1]
for fp, cds := range m {
delete(m, fp)
// Prune empty maps from top level.
for len(m) == 0 {
*cm = (*cm)[:len(*cm)-1]
if len(*cm) == 0 {
break
}
m = (*cm)[len(*cm)-1]
}
return fp, cds
}
panic("popped from empty chunkMaps")
}

@ -16,12 +16,15 @@ package local
import (
"fmt"
"math/rand"
"reflect"
"testing"
"testing/quick"
"time"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility/test"
)
@ -662,3 +665,93 @@ func verifyStorage(t testing.TB, s Storage, samples clientmodel.Samples, maxAge
}
return result
}
func TestChunkMaps(t *testing.T) {
cm := chunkMaps{}
cd1 := &chunkDesc{refCount: 1} // Abuse refCount as identifier.
cd21 := &chunkDesc{refCount: 21}
cd22 := &chunkDesc{refCount: 22}
cd31 := &chunkDesc{refCount: 31}
cd32 := &chunkDesc{refCount: 32}
cd33 := &chunkDesc{refCount: 33}
cd41 := &chunkDesc{refCount: 41}
cd42 := &chunkDesc{refCount: 42}
cd43 := &chunkDesc{refCount: 43}
cd44 := &chunkDesc{refCount: 44}
cd51 := &chunkDesc{refCount: 51}
cd52 := &chunkDesc{refCount: 52}
cd53 := &chunkDesc{refCount: 53}
cd54 := &chunkDesc{refCount: 54}
cd55 := &chunkDesc{refCount: 55}
cm.add(5, cd51)
cm.add(3, cd31)
cm.add(5, cd52)
cm.add(1, cd1)
cm.add(4, cd41)
cm.add(4, cd42)
cm.add(5, cd53)
cm.add(3, cd32)
cm.add(2, cd21)
cm.add(5, cd54)
cm.add(3, cd33)
cm.add(4, cd43)
cm.add(2, cd22)
cm.add(4, cd44)
cm.add(5, cd55)
var fpWant, fpGot clientmodel.Fingerprint
var cdsWant, cdsGot []*chunkDesc
fpWant = 5
cdsWant = []*chunkDesc{cd51, cd52, cd53, cd54, cd55}
fpGot, cdsGot = cm.pop()
if fpWant != fpGot {
t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot)
}
if !reflect.DeepEqual(cdsWant, cdsGot) {
t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot)
}
fpWant = 4
cdsWant = []*chunkDesc{cd41, cd42, cd43, cd44}
fpGot, cdsGot = cm.pop()
if fpWant != fpGot {
t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot)
}
if !reflect.DeepEqual(cdsWant, cdsGot) {
t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot)
}
fpWant = 3
cdsWant = []*chunkDesc{cd31, cd32, cd33}
fpGot, cdsGot = cm.pop()
if fpWant != fpGot {
t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot)
}
if !reflect.DeepEqual(cdsWant, cdsGot) {
t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot)
}
fpWant = 2
cdsWant = []*chunkDesc{cd21, cd22}
fpGot, cdsGot = cm.pop()
if fpWant != fpGot {
t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot)
}
if !reflect.DeepEqual(cdsWant, cdsGot) {
t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot)
}
fpWant = 1
cdsWant = []*chunkDesc{cd1}
fpGot, cdsGot = cm.pop()
if fpWant != fpGot {
t.Errorf("Want fingerprint %s, got %s.", fpWant, fpGot)
}
if !reflect.DeepEqual(cdsWant, cdsGot) {
t.Errorf("Want chunk descriptors %v, got %v.", cdsWant, cdsGot)
}
}

@ -41,7 +41,7 @@ func NewTestStorage(t test.T) (Storage, test.Closer) {
directory := test.NewTemporaryDirectory("test_storage", t)
o := &MemorySeriesStorageOptions{
MemoryChunks: 1000000,
PersistenceRetentionPeriod: 24 * 7 * time.Hour,
PersistenceRetentionPeriod: 24 * time.Hour * 365 * 100, // Enough to never trigger purging.
PersistenceStoragePath: directory.Path(),
CheckpointInterval: time.Hour,
}

Loading…
Cancel
Save