From 11b3c2387c32448f4c57b1fc045e1b54ddd24c95 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Tue, 10 Feb 2015 14:49:01 +0100 Subject: [PATCH] Improvements after review. - Increase samplesQueueCapacity. - Improve docstring for the above. - Accept a short waiting period for the ingest channel to become ready. This should depend on the http timeout, but 100ms is probably good enough to cushion bursts bigger than samplesQueueCapacity, while it is unlikely that anybody ever will set an HTTP timeout similarly short. --- main.go | 2 +- retrieval/ingester.go | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index ffd74095b..199846fd7 100644 --- a/main.go +++ b/main.go @@ -52,7 +52,7 @@ var ( remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.") remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.") - samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 4096, "The capacity of the queue of samples to be stored.") + samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 64*1024, "The capacity of the queue of samples to be stored. Note that each slot in the queue takes a whole slice of samples whose size depends on details of the scrape process.") numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") diff --git a/retrieval/ingester.go b/retrieval/ingester.go index 428c2833a..7bcda4da6 100644 --- a/retrieval/ingester.go +++ b/retrieval/ingester.go @@ -15,12 +15,15 @@ package retrieval import ( "errors" + "time" "github.com/prometheus/client_golang/extraction" clientmodel "github.com/prometheus/client_golang/model" ) +const ingestTimeout = 100 * time.Millisecond // TODO(beorn7): Adjust this to a fraction of the actual HTTP timeout. + var errIngestChannelFull = errors.New("ingestion channel full") // MergeLabelsIngester merges a labelset ontop of a given extraction result and @@ -47,13 +50,22 @@ func (i *MergeLabelsIngester) Ingest(samples clientmodel.Samples) error { type ChannelIngester chan<- clientmodel.Samples // Ingest ingests the provided extraction result by sending it to its channel. -// It returns an error if the channel is not ready to receive. This is important -// to fail fast and to not pile up ingestion requests in case of overload. +// If the channel was not able to receive the samples within the ingestTimeout, +// an error is returned. This is important to fail fast and to not pile up +// ingestion requests in case of overload. func (i ChannelIngester) Ingest(s clientmodel.Samples) error { + // Since the regular case is that i is ready to receive, first try + // without setting a timeout so that we don't need to allocate a timer + // most of the time. select { case i <- s: return nil default: - return errIngestChannelFull + select { + case i <- s: + return nil + case <-time.After(ingestTimeout): + return errIngestChannelFull + } } }