mirror of https://github.com/prometheus/prometheus
Curation should not starve user-interactive ops.
The background curation should be staggered to ensure that disk I/O yields to user-interactive operations in a timely manner. The lack of routine prioritization necessitates this. Change-Id: I9b498a74ccd933ffb856e06fedc167430e521d86changes/14/14/3
parent
93a8d03221
commit
12d5e6ca5a
4
main.go
4
main.go
|
@ -114,6 +114,8 @@ func (p *prometheus) compact(olderThan time.Duration, groupSize int) error {
|
|||
|
||||
curator := metric.Curator{
|
||||
Stop: p.stopBackgroundOperations,
|
||||
|
||||
ViewQueue: p.storage.ViewQueue,
|
||||
}
|
||||
|
||||
return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState)
|
||||
|
@ -129,6 +131,8 @@ func (p *prometheus) delete(olderThan time.Duration, batchSize int) error {
|
|||
|
||||
curator := metric.Curator{
|
||||
Stop: p.stopBackgroundOperations,
|
||||
|
||||
ViewQueue: p.storage.ViewQueue,
|
||||
}
|
||||
|
||||
return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState)
|
||||
|
|
|
@ -32,6 +32,8 @@ import (
|
|||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
)
|
||||
|
||||
const curationYieldPeriod = 250 * time.Millisecond
|
||||
|
||||
// CurationStateUpdater receives updates about the curation state.
|
||||
type CurationStateUpdater interface {
|
||||
UpdateCurationState(*CurationState)
|
||||
|
@ -54,6 +56,8 @@ type Curator struct {
|
|||
// The moment a value is ingested inside of it, the curator goes into drain
|
||||
// mode.
|
||||
Stop chan bool
|
||||
|
||||
ViewQueue chan viewJob
|
||||
}
|
||||
|
||||
// watermarkScanner converts (dto.Fingerprint, dto.MetricHighWatermark) doubles
|
||||
|
@ -89,6 +93,8 @@ type watermarkScanner struct {
|
|||
status CurationStateUpdater
|
||||
|
||||
firstBlock, lastBlock *SampleKey
|
||||
|
||||
ViewQueue chan viewJob
|
||||
}
|
||||
|
||||
// run facilitates the curation lifecycle.
|
||||
|
@ -146,6 +152,8 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces
|
|||
|
||||
firstBlock: firstBlock,
|
||||
lastBlock: lastBlock,
|
||||
|
||||
ViewQueue: c.ViewQueue,
|
||||
}
|
||||
|
||||
// Right now, the ability to stop a curation is limited to the beginning of
|
||||
|
@ -276,6 +284,10 @@ func (w *watermarkScanner) curationConsistent(f *clientmodel.Fingerprint, waterm
|
|||
}
|
||||
|
||||
func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorError) {
|
||||
if len(w.ViewQueue) > 0 {
|
||||
time.Sleep(curationYieldPeriod)
|
||||
}
|
||||
|
||||
fingerprint := key.(*clientmodel.Fingerprint)
|
||||
|
||||
if fingerprint.Less(w.firstBlock.Fingerprint) {
|
||||
|
|
|
@ -82,7 +82,7 @@ type TieredStorage struct {
|
|||
memoryTTL time.Duration
|
||||
flushMemoryInterval time.Duration
|
||||
|
||||
viewQueue chan viewJob
|
||||
ViewQueue chan viewJob
|
||||
|
||||
draining chan chan<- bool
|
||||
|
||||
|
@ -131,7 +131,7 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
|
|||
flushMemoryInterval: flushMemoryInterval,
|
||||
memoryArena: NewMemorySeriesStorage(memOptions),
|
||||
memoryTTL: memoryTTL,
|
||||
viewQueue: make(chan viewJob, viewQueueDepth),
|
||||
ViewQueue: make(chan viewJob, viewQueueDepth),
|
||||
|
||||
memorySemaphore: make(chan bool, tieredMemorySemaphores),
|
||||
|
||||
|
@ -194,7 +194,7 @@ func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
|
|||
abortChan := make(chan bool, 1)
|
||||
errChan := make(chan error)
|
||||
queryStats.GetTimer(stats.ViewQueueTime).Start()
|
||||
t.viewQueue <- viewJob{
|
||||
t.ViewQueue <- viewJob{
|
||||
builder: builder,
|
||||
output: result,
|
||||
abort: abortChan,
|
||||
|
@ -240,7 +240,7 @@ func (t *TieredStorage) Serve(started chan<- bool) {
|
|||
select {
|
||||
case <-flushMemoryTicker.C:
|
||||
t.flushMemory(t.memoryTTL)
|
||||
case viewRequest := <-t.viewQueue:
|
||||
case viewRequest := <-t.ViewQueue:
|
||||
viewRequest.stats.GetTimer(stats.ViewQueueTime).Stop()
|
||||
<-t.memorySemaphore
|
||||
go t.renderView(viewRequest)
|
||||
|
@ -256,8 +256,8 @@ func (t *TieredStorage) reportQueues() {
|
|||
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "occupancy"}, float64(len(t.appendToDiskQueue)))
|
||||
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "capacity"}, float64(cap(t.appendToDiskQueue)))
|
||||
|
||||
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "occupancy"}, float64(len(t.viewQueue)))
|
||||
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.viewQueue)))
|
||||
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "occupancy"}, float64(len(t.ViewQueue)))
|
||||
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.ViewQueue)))
|
||||
}
|
||||
|
||||
func (t *TieredStorage) Flush() {
|
||||
|
@ -306,7 +306,7 @@ func (t *TieredStorage) close() {
|
|||
// BUG(matt): There is a probability that pending items may hang here and not
|
||||
// get flushed.
|
||||
close(t.appendToDiskQueue)
|
||||
close(t.viewQueue)
|
||||
close(t.ViewQueue)
|
||||
t.wmCache.Clear()
|
||||
|
||||
t.state = tieredStorageStopping
|
||||
|
|
Loading…
Reference in New Issue