mirror of https://github.com/prometheus/prometheus
Allow checkpoints and maintenance to happen concurrently. (#2321)
This is essential on larger Prometheus servers, as otherwise checkpoints prevent sufficient persisting of chunks to disk.pull/2341/head
parent
1dcb7637f5
commit
f64c231dad
|
@ -1252,7 +1252,7 @@ func (s *MemorySeriesStorage) cycleThroughArchivedFingerprints() chan model.Fing
|
||||||
func (s *MemorySeriesStorage) loop() {
|
func (s *MemorySeriesStorage) loop() {
|
||||||
checkpointTimer := time.NewTimer(s.checkpointInterval)
|
checkpointTimer := time.NewTimer(s.checkpointInterval)
|
||||||
|
|
||||||
dirtySeriesCount := 0
|
var dirtySeriesCount int64
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
checkpointTimer.Stop()
|
checkpointTimer.Stop()
|
||||||
|
@ -1263,18 +1263,23 @@ func (s *MemorySeriesStorage) loop() {
|
||||||
memoryFingerprints := s.cycleThroughMemoryFingerprints()
|
memoryFingerprints := s.cycleThroughMemoryFingerprints()
|
||||||
archivedFingerprints := s.cycleThroughArchivedFingerprints()
|
archivedFingerprints := s.cycleThroughArchivedFingerprints()
|
||||||
|
|
||||||
loop:
|
// Checkpoints can happen concurrently with maintenance so even with heavy
|
||||||
|
// checkpointing there will still be sufficient progress on maintenance.
|
||||||
|
checkpointLoopStopped := make(chan struct{})
|
||||||
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-s.loopStopping:
|
case <-s.loopStopping:
|
||||||
break loop
|
checkpointLoopStopped <- struct{}{}
|
||||||
|
return
|
||||||
case <-checkpointTimer.C:
|
case <-checkpointTimer.C:
|
||||||
|
// We clear this before the checkpoint so that dirtySeriesCount
|
||||||
|
// is an upper bound.
|
||||||
|
atomic.StoreInt64(&dirtySeriesCount, 0)
|
||||||
|
s.dirtySeries.Set(0)
|
||||||
err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
|
err := s.persistence.checkpointSeriesMapAndHeads(s.fpToSeries, s.fpLocker)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorln("Error while checkpointing:", err)
|
log.Errorln("Error while checkpointing:", err)
|
||||||
} else {
|
|
||||||
dirtySeriesCount = 0
|
|
||||||
s.dirtySeries.Set(0)
|
|
||||||
}
|
}
|
||||||
// If a checkpoint takes longer than checkpointInterval, unluckily timed
|
// If a checkpoint takes longer than checkpointInterval, unluckily timed
|
||||||
// combination with the Reset(0) call below can lead to a case where a
|
// combination with the Reset(0) call below can lead to a case where a
|
||||||
|
@ -1284,17 +1289,26 @@ loop:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
checkpointTimer.Reset(s.checkpointInterval)
|
checkpointTimer.Reset(s.checkpointInterval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.loopStopping:
|
||||||
|
break loop
|
||||||
case fp := <-memoryFingerprints:
|
case fp := <-memoryFingerprints:
|
||||||
if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {
|
if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {
|
||||||
dirtySeriesCount++
|
dirty := atomic.AddInt64(&dirtySeriesCount, 1)
|
||||||
s.dirtySeries.Inc()
|
s.dirtySeries.Set(float64(dirty))
|
||||||
// Check if we have enough "dirty" series so that we need an early checkpoint.
|
// Check if we have enough "dirty" series so that we need an early checkpoint.
|
||||||
// However, if we are already behind persisting chunks, creating a checkpoint
|
// However, if we are already behind persisting chunks, creating a checkpoint
|
||||||
// would be counterproductive, as it would slow down chunk persisting even more,
|
// would be counterproductive, as it would slow down chunk persisting even more,
|
||||||
// while in a situation like that, where we are clearly lacking speed of disk
|
// while in a situation like that, where we are clearly lacking speed of disk
|
||||||
// maintenance, the best we can do for crash recovery is to persist chunks as
|
// maintenance, the best we can do for crash recovery is to persist chunks as
|
||||||
// quickly as possible. So only checkpoint if the urgency score is < 1.
|
// quickly as possible. So only checkpoint if the urgency score is < 1.
|
||||||
if dirtySeriesCount >= s.checkpointDirtySeriesLimit &&
|
if dirty >= int64(s.checkpointDirtySeriesLimit) &&
|
||||||
s.calculatePersistenceUrgencyScore() < 1 {
|
s.calculatePersistenceUrgencyScore() < 1 {
|
||||||
checkpointTimer.Reset(0)
|
checkpointTimer.Reset(0)
|
||||||
}
|
}
|
||||||
|
@ -1308,6 +1322,7 @@ loop:
|
||||||
}
|
}
|
||||||
for range archivedFingerprints {
|
for range archivedFingerprints {
|
||||||
}
|
}
|
||||||
|
<-checkpointLoopStopped
|
||||||
}
|
}
|
||||||
|
|
||||||
// maintainMemorySeries maintains a series that is in memory (i.e. not
|
// maintainMemorySeries maintains a series that is in memory (i.e. not
|
||||||
|
|
Loading…
Reference in New Issue