2017-05-10 09:44:13 +00:00
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
2018-05-29 08:51:29 +00:00
"context"
2022-01-04 09:46:52 +00:00
"errors"
2017-05-10 09:44:13 +00:00
"math"
2018-09-07 21:26:04 +00:00
"strconv"
2017-05-10 09:44:13 +00:00
"sync"
"time"
2021-06-11 16:17:59 +00:00
"github.com/go-kit/log"
"github.com/go-kit/log/level"
2018-09-07 21:26:04 +00:00
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
2020-11-19 15:23:03 +00:00
"github.com/prometheus/client_golang/prometheus"
2021-02-10 22:25:37 +00:00
"github.com/prometheus/common/model"
2022-01-25 10:08:04 +00:00
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
2021-10-22 08:19:38 +00:00
"go.uber.org/atomic"
2017-05-10 09:44:13 +00:00
"github.com/prometheus/prometheus/config"
2021-11-08 14:23:17 +00:00
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
2017-10-23 20:28:17 +00:00
"github.com/prometheus/prometheus/prompb"
2020-11-19 15:23:03 +00:00
"github.com/prometheus/prometheus/scrape"
2021-11-06 10:10:04 +00:00
"github.com/prometheus/prometheus/tsdb/chunks"
2019-09-19 09:15:41 +00:00
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wal"
2017-05-10 09:44:13 +00:00
)
const (
// We track samples in/out and how long pushes take using an Exponentially
// Weighted Moving Average.
ewmaWeight = 0.2
shardUpdateDuration = 10 * time . Second
// Allow 30% too many shards before scaling down.
shardToleranceFraction = 0.3
)
2020-02-03 21:47:03 +00:00
type queueManagerMetrics struct {
2020-04-25 03:39:46 +00:00
reg prometheus . Registerer
2021-05-06 20:53:52 +00:00
samplesTotal prometheus . Counter
exemplarsTotal prometheus . Counter
metadataTotal prometheus . Counter
failedSamplesTotal prometheus . Counter
failedExemplarsTotal prometheus . Counter
failedMetadataTotal prometheus . Counter
retriedSamplesTotal prometheus . Counter
retriedExemplarsTotal prometheus . Counter
retriedMetadataTotal prometheus . Counter
droppedSamplesTotal prometheus . Counter
droppedExemplarsTotal prometheus . Counter
enqueueRetriesTotal prometheus . Counter
sentBatchDuration prometheus . Histogram
highestSentTimestamp * maxTimestamp
pendingSamples prometheus . Gauge
pendingExemplars prometheus . Gauge
shardCapacity prometheus . Gauge
numShards prometheus . Gauge
maxNumShards prometheus . Gauge
minNumShards prometheus . Gauge
desiredNumShards prometheus . Gauge
sentBytesTotal prometheus . Counter
metadataBytesTotal prometheus . Counter
maxSamplesPerSend prometheus . Gauge
2020-02-03 21:47:03 +00:00
}
2020-04-25 03:39:46 +00:00
func newQueueManagerMetrics ( r prometheus . Registerer , rn , e string ) * queueManagerMetrics {
m := & queueManagerMetrics {
reg : r ,
}
constLabels := prometheus . Labels {
remoteName : rn ,
endpoint : e ,
}
2020-11-19 15:23:03 +00:00
m . samplesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "samples_total" ,
Help : "Total number of samples sent to remote storage." ,
ConstLabels : constLabels ,
} )
2021-05-06 20:53:52 +00:00
m . exemplarsTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "exemplars_total" ,
Help : "Total number of exemplars sent to remote storage." ,
ConstLabels : constLabels ,
} )
2020-11-19 15:23:03 +00:00
m . metadataTotal = prometheus . NewCounter ( prometheus . CounterOpts {
2020-04-25 03:39:46 +00:00
Namespace : namespace ,
Subsystem : subsystem ,
2020-11-19 15:23:03 +00:00
Name : "metadata_total" ,
Help : "Total number of metadata entries sent to remote storage." ,
2020-04-25 03:39:46 +00:00
ConstLabels : constLabels ,
} )
m . failedSamplesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
2020-11-19 15:23:03 +00:00
Name : "samples_failed_total" ,
2020-04-25 03:39:46 +00:00
Help : "Total number of samples which failed on send to remote storage, non-recoverable errors." ,
ConstLabels : constLabels ,
} )
2021-05-06 20:53:52 +00:00
m . failedExemplarsTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "exemplars_failed_total" ,
Help : "Total number of exemplars which failed on send to remote storage, non-recoverable errors." ,
ConstLabels : constLabels ,
} )
2020-11-19 15:23:03 +00:00
m . failedMetadataTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "metadata_failed_total" ,
Help : "Total number of metadata entries which failed on send to remote storage, non-recoverable errors." ,
ConstLabels : constLabels ,
} )
2020-04-25 03:39:46 +00:00
m . retriedSamplesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
2020-11-19 15:23:03 +00:00
Name : "samples_retried_total" ,
2020-04-25 03:39:46 +00:00
Help : "Total number of samples which failed on send to remote storage but were retried because the send error was recoverable." ,
ConstLabels : constLabels ,
} )
2021-05-06 20:53:52 +00:00
m . retriedExemplarsTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "exemplars_retried_total" ,
Help : "Total number of exemplars which failed on send to remote storage but were retried because the send error was recoverable." ,
ConstLabels : constLabels ,
} )
2020-11-19 15:23:03 +00:00
m . retriedMetadataTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "metadata_retried_total" ,
Help : "Total number of metadata entries which failed on send to remote storage but were retried because the send error was recoverable." ,
ConstLabels : constLabels ,
} )
2020-04-25 03:39:46 +00:00
m . droppedSamplesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
2020-11-19 15:23:03 +00:00
Name : "samples_dropped_total" ,
2021-05-06 20:53:52 +00:00
Help : "Total number of samples which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID." ,
ConstLabels : constLabels ,
} )
m . droppedExemplarsTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "exemplars_dropped_total" ,
Help : "Total number of exemplars which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID." ,
2020-04-25 03:39:46 +00:00
ConstLabels : constLabels ,
} )
m . enqueueRetriesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "enqueue_retries_total" ,
Help : "Total number of times enqueue has failed because a shards queue was full." ,
ConstLabels : constLabels ,
} )
m . sentBatchDuration = prometheus . NewHistogram ( prometheus . HistogramOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "sent_batch_duration_seconds" ,
2020-11-19 15:23:03 +00:00
Help : "Duration of send calls to the remote storage." ,
increase the remote write bucket range (#7323)
* increase the remote write bucket range
Increase the range of remote write buckets to capture times above 10s for laggy scenarios
Buckets had been: {.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
Buckets are now: {0.03125, 0.0625, 0.125, 0.25, 0.5, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512}
Signed-off-by: Bert Hartmann <berthartm@gmail.com>
* revert back to DefBuckets with addons to be backwards compatible
Signed-off-by: Bert Hartmann <berthartm@gmail.com>
* shuffle the buckets to maintain 2-2.5x increases
Signed-off-by: Bert Hartmann <berthartm@gmail.com>
2020-06-04 19:54:47 +00:00
Buckets : append ( prometheus . DefBuckets , 25 , 60 , 120 , 300 ) ,
2020-04-25 03:39:46 +00:00
ConstLabels : constLabels ,
} )
2020-10-15 21:53:59 +00:00
m . highestSentTimestamp = & maxTimestamp {
2020-04-25 03:39:46 +00:00
Gauge : prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "queue_highest_sent_timestamp_seconds" ,
Help : "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue, in seconds since epoch." ,
ConstLabels : constLabels ,
} ) ,
}
m . pendingSamples = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
2020-11-19 15:23:03 +00:00
Name : "samples_pending" ,
2020-04-25 03:39:46 +00:00
Help : "The number of samples pending in the queues shards to be sent to the remote storage." ,
ConstLabels : constLabels ,
} )
2021-05-06 20:53:52 +00:00
m . pendingExemplars = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "exemplars_pending" ,
Help : "The number of exemplars pending in the queues shards to be sent to the remote storage." ,
ConstLabels : constLabels ,
} )
2020-04-25 03:39:46 +00:00
m . shardCapacity = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shard_capacity" ,
Help : "The capacity of each shard of the queue used for parallel sending to the remote storage." ,
ConstLabels : constLabels ,
} )
m . numShards = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shards" ,
Help : "The number of shards used for parallel sending to the remote storage." ,
ConstLabels : constLabels ,
} )
m . maxNumShards = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shards_max" ,
Help : "The maximum number of shards that the queue is allowed to run." ,
ConstLabels : constLabels ,
} )
m . minNumShards = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shards_min" ,
Help : "The minimum number of shards that the queue is allowed to run." ,
ConstLabels : constLabels ,
} )
m . desiredNumShards = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shards_desired" ,
Help : "The number of shards that the queues shard calculation wants to run based on the rate of samples in vs. samples out." ,
ConstLabels : constLabels ,
} )
2021-05-06 20:53:52 +00:00
m . sentBytesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
2020-11-19 15:23:03 +00:00
Namespace : namespace ,
Subsystem : subsystem ,
2021-05-06 20:53:52 +00:00
Name : "bytes_total" ,
Help : "The total number of bytes of data (not metadata) sent by the queue after compression. Note that when exemplars over remote write is enabled the exemplars included in a remote write request count towards this metric." ,
2020-11-19 15:23:03 +00:00
ConstLabels : constLabels ,
} )
m . metadataBytesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
2020-04-25 03:39:46 +00:00
Namespace : namespace ,
Subsystem : subsystem ,
2020-11-19 15:23:03 +00:00
Name : "metadata_bytes_total" ,
Help : "The total number of bytes of metadata sent by the queue after compression." ,
2020-04-25 03:39:46 +00:00
ConstLabels : constLabels ,
} )
2020-10-28 11:39:36 +00:00
m . maxSamplesPerSend = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "max_samples_per_send" ,
2021-05-06 20:53:52 +00:00
Help : "The maximum number of samples to be sent, in a single request, to the remote storage. Note that, when sending of exemplars over remote write is enabled, exemplars count towards this limt." ,
2020-10-28 11:39:36 +00:00
ConstLabels : constLabels ,
} )
2020-02-03 21:47:03 +00:00
2020-06-26 06:33:52 +00:00
return m
}
func ( m * queueManagerMetrics ) register ( ) {
if m . reg != nil {
m . reg . MustRegister (
2020-11-19 15:23:03 +00:00
m . samplesTotal ,
2021-05-06 20:53:52 +00:00
m . exemplarsTotal ,
2020-11-19 15:23:03 +00:00
m . metadataTotal ,
2020-02-03 21:47:03 +00:00
m . failedSamplesTotal ,
2021-05-06 20:53:52 +00:00
m . failedExemplarsTotal ,
2020-11-19 15:23:03 +00:00
m . failedMetadataTotal ,
2020-02-03 21:47:03 +00:00
m . retriedSamplesTotal ,
2021-05-06 20:53:52 +00:00
m . retriedExemplarsTotal ,
2020-11-19 15:23:03 +00:00
m . retriedMetadataTotal ,
2020-02-03 21:47:03 +00:00
m . droppedSamplesTotal ,
2021-05-06 20:53:52 +00:00
m . droppedExemplarsTotal ,
2020-02-03 21:47:03 +00:00
m . enqueueRetriesTotal ,
m . sentBatchDuration ,
2020-04-25 03:39:46 +00:00
m . highestSentTimestamp ,
m . pendingSamples ,
2021-05-06 20:53:52 +00:00
m . pendingExemplars ,
2020-02-03 21:47:03 +00:00
m . shardCapacity ,
m . numShards ,
m . maxNumShards ,
m . minNumShards ,
m . desiredNumShards ,
2021-05-06 20:53:52 +00:00
m . sentBytesTotal ,
2020-11-19 15:23:03 +00:00
m . metadataBytesTotal ,
2020-10-28 11:39:36 +00:00
m . maxSamplesPerSend ,
2020-02-03 21:47:03 +00:00
)
}
}
2017-05-10 09:44:13 +00:00
2020-04-25 03:39:46 +00:00
func ( m * queueManagerMetrics ) unregister ( ) {
if m . reg != nil {
2020-11-19 15:23:03 +00:00
m . reg . Unregister ( m . samplesTotal )
2021-05-06 20:53:52 +00:00
m . reg . Unregister ( m . exemplarsTotal )
2020-11-19 15:23:03 +00:00
m . reg . Unregister ( m . metadataTotal )
2020-04-25 03:39:46 +00:00
m . reg . Unregister ( m . failedSamplesTotal )
2021-05-06 20:53:52 +00:00
m . reg . Unregister ( m . failedExemplarsTotal )
2020-11-19 15:23:03 +00:00
m . reg . Unregister ( m . failedMetadataTotal )
2020-04-25 03:39:46 +00:00
m . reg . Unregister ( m . retriedSamplesTotal )
2021-05-06 20:53:52 +00:00
m . reg . Unregister ( m . retriedExemplarsTotal )
2020-11-19 15:23:03 +00:00
m . reg . Unregister ( m . retriedMetadataTotal )
2020-04-25 03:39:46 +00:00
m . reg . Unregister ( m . droppedSamplesTotal )
2021-05-06 20:53:52 +00:00
m . reg . Unregister ( m . droppedExemplarsTotal )
2020-04-25 03:39:46 +00:00
m . reg . Unregister ( m . enqueueRetriesTotal )
m . reg . Unregister ( m . sentBatchDuration )
m . reg . Unregister ( m . highestSentTimestamp )
m . reg . Unregister ( m . pendingSamples )
2021-05-06 20:53:52 +00:00
m . reg . Unregister ( m . pendingExemplars )
2020-04-25 03:39:46 +00:00
m . reg . Unregister ( m . shardCapacity )
m . reg . Unregister ( m . numShards )
m . reg . Unregister ( m . maxNumShards )
m . reg . Unregister ( m . minNumShards )
m . reg . Unregister ( m . desiredNumShards )
2021-05-06 20:53:52 +00:00
m . reg . Unregister ( m . sentBytesTotal )
2020-11-19 15:23:03 +00:00
m . reg . Unregister ( m . metadataBytesTotal )
2020-10-28 11:39:36 +00:00
m . reg . Unregister ( m . maxSamplesPerSend )
2020-04-25 03:39:46 +00:00
}
}
2020-06-24 13:41:52 +00:00
// WriteClient defines an interface for sending a batch of samples to an
2017-05-10 09:44:13 +00:00
// external timeseries database.
2020-06-24 13:41:52 +00:00
type WriteClient interface {
2017-05-10 09:44:13 +00:00
// Store stores the given samples in the remote storage.
2018-09-07 21:26:04 +00:00
Store ( context . Context , [ ] byte ) error
2019-12-12 20:47:23 +00:00
// Name uniquely identifies the remote storage.
2017-05-10 09:44:13 +00:00
Name ( ) string
2019-12-12 20:47:23 +00:00
// Endpoint is the remote read or write endpoint for the storage client.
Endpoint ( ) string
2017-05-10 09:44:13 +00:00
}
// QueueManager manages a queue of samples to be sent to the Storage
2020-06-24 13:41:52 +00:00
// indicated by the provided WriteClient. Implements writeTo interface
2018-09-07 21:26:04 +00:00
// used by WAL Watcher.
2017-05-10 09:44:13 +00:00
type QueueManager struct {
2020-07-30 07:45:42 +00:00
lastSendTimestamp atomic . Int64
2019-10-21 21:54:25 +00:00
2020-11-19 15:23:03 +00:00
logger log . Logger
flushDeadline time . Duration
cfg config . QueueConfig
mcfg config . MetadataConfig
externalLabels labels . Labels
relabelConfigs [ ] * relabel . Config
2021-05-06 20:53:52 +00:00
sendExemplars bool
2020-11-19 15:23:03 +00:00
watcher * wal . Watcher
metadataWatcher * MetadataWatcher
2018-09-07 21:26:04 +00:00
2020-03-31 03:39:29 +00:00
clientMtx sync . RWMutex
2020-06-24 13:41:52 +00:00
storeClient WriteClient
2020-03-31 03:39:29 +00:00
2021-07-27 20:21:48 +00:00
seriesMtx sync . Mutex // Covers seriesLabels and droppedSeries.
2021-11-06 10:10:04 +00:00
seriesLabels map [ chunks . HeadSeriesRef ] labels . Labels
droppedSeries map [ chunks . HeadSeriesRef ] struct { }
2021-07-27 20:21:48 +00:00
seriesSegmentMtx sync . Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first.
2021-11-06 10:10:04 +00:00
seriesSegmentIndexes map [ chunks . HeadSeriesRef ] int
2017-05-10 09:44:13 +00:00
shards * shards
numShards int
reshardChan chan int
2019-01-18 12:48:16 +00:00
quit chan struct { }
wg sync . WaitGroup
2017-05-10 09:44:13 +00:00
2021-05-06 20:53:52 +00:00
dataIn , dataDropped , dataOut , dataOutDuration * ewmaRate
2019-03-05 12:21:11 +00:00
2020-09-24 18:44:18 +00:00
metrics * queueManagerMetrics
interner * pool
2020-10-15 21:53:59 +00:00
highestRecvTimestamp * maxTimestamp
2017-05-10 09:44:13 +00:00
}
// NewQueueManager builds a new QueueManager.
2020-04-25 03:39:46 +00:00
func NewQueueManager (
metrics * queueManagerMetrics ,
watcherMetrics * wal . WatcherMetrics ,
readerMetrics * wal . LiveReaderMetrics ,
logger log . Logger ,
walDir string ,
samplesIn * ewmaRate ,
cfg config . QueueConfig ,
2020-11-19 15:23:03 +00:00
mCfg config . MetadataConfig ,
2020-04-25 03:39:46 +00:00
externalLabels labels . Labels ,
relabelConfigs [ ] * relabel . Config ,
2020-06-24 13:41:52 +00:00
client WriteClient ,
2020-04-25 03:39:46 +00:00
flushDeadline time . Duration ,
2020-09-24 18:44:18 +00:00
interner * pool ,
2020-10-15 21:53:59 +00:00
highestRecvTimestamp * maxTimestamp ,
2020-11-19 15:23:03 +00:00
sm ReadyScrapeManager ,
2021-05-06 20:53:52 +00:00
enableExemplarRemoteWrite bool ,
2020-04-25 03:39:46 +00:00
) * QueueManager {
2017-08-11 18:45:52 +00:00
if logger == nil {
logger = log . NewNopLogger ( )
}
2019-03-05 12:21:11 +00:00
2019-12-12 20:47:23 +00:00
logger = log . With ( logger , remoteName , client . Name ( ) , endpoint , client . Endpoint ( ) )
2017-05-10 09:44:13 +00:00
t := & QueueManager {
2019-03-05 12:21:11 +00:00
logger : logger ,
2018-05-23 14:03:54 +00:00
flushDeadline : flushDeadline ,
2017-05-10 09:44:13 +00:00
cfg : cfg ,
2020-11-19 15:23:03 +00:00
mcfg : mCfg ,
2017-05-10 09:44:13 +00:00
externalLabels : externalLabels ,
relabelConfigs : relabelConfigs ,
2020-03-31 03:39:29 +00:00
storeClient : client ,
2021-05-06 20:53:52 +00:00
sendExemplars : enableExemplarRemoteWrite ,
2017-05-10 09:44:13 +00:00
2021-11-06 10:10:04 +00:00
seriesLabels : make ( map [ chunks . HeadSeriesRef ] labels . Labels ) ,
seriesSegmentIndexes : make ( map [ chunks . HeadSeriesRef ] int ) ,
droppedSeries : make ( map [ chunks . HeadSeriesRef ] struct { } ) ,
2018-09-07 21:26:04 +00:00
2018-12-04 17:32:14 +00:00
numShards : cfg . MinShards ,
2017-05-10 09:44:13 +00:00
reshardChan : make ( chan int ) ,
quit : make ( chan struct { } ) ,
2021-05-06 20:53:52 +00:00
dataIn : samplesIn ,
dataDropped : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
dataOut : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
dataOutDuration : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
2020-02-03 21:47:03 +00:00
2020-09-24 18:44:18 +00:00
metrics : metrics ,
interner : interner ,
highestRecvTimestamp : highestRecvTimestamp ,
2019-03-01 19:04:26 +00:00
}
2018-09-07 21:26:04 +00:00
2021-05-06 20:53:52 +00:00
t . watcher = wal . NewWatcher ( watcherMetrics , readerMetrics , logger , client . Name ( ) , t , walDir , enableExemplarRemoteWrite )
2020-11-19 15:23:03 +00:00
if t . mcfg . Send {
t . metadataWatcher = NewMetadataWatcher ( logger , sm , client . Name ( ) , t , t . mcfg . SendInterval , flushDeadline )
}
2019-03-05 12:21:11 +00:00
t . shards = t . newShards ( )
2017-05-10 09:44:13 +00:00
return t
}
2021-10-29 21:44:40 +00:00
// AppendMetadata sends metadata the remote storage. Metadata is sent in batches, but is not parallelized.
2020-11-19 15:23:03 +00:00
func ( t * QueueManager ) AppendMetadata ( ctx context . Context , metadata [ ] scrape . MetricMetadata ) {
mm := make ( [ ] prompb . MetricMetadata , 0 , len ( metadata ) )
for _ , entry := range metadata {
mm = append ( mm , prompb . MetricMetadata {
MetricFamilyName : entry . Metric ,
Help : entry . Help ,
Type : metricTypeToMetricTypeProto ( entry . Type ) ,
Unit : entry . Unit ,
} )
}
2021-10-29 21:44:40 +00:00
pBuf := proto . NewBuffer ( nil )
2021-06-24 22:39:50 +00:00
numSends := int ( math . Ceil ( float64 ( len ( metadata ) ) / float64 ( t . mcfg . MaxSamplesPerSend ) ) )
for i := 0 ; i < numSends ; i ++ {
last := ( i + 1 ) * t . mcfg . MaxSamplesPerSend
if last > len ( metadata ) {
last = len ( metadata )
}
2021-10-29 21:44:40 +00:00
err := t . sendMetadataWithBackoff ( ctx , mm [ i * t . mcfg . MaxSamplesPerSend : last ] , pBuf )
2021-06-24 22:39:50 +00:00
if err != nil {
t . metrics . failedMetadataTotal . Add ( float64 ( last - ( i * t . mcfg . MaxSamplesPerSend ) ) )
level . Error ( t . logger ) . Log ( "msg" , "non-recoverable error while sending metadata" , "count" , last - ( i * t . mcfg . MaxSamplesPerSend ) , "err" , err )
}
2020-11-19 15:23:03 +00:00
}
}
2021-10-29 21:44:40 +00:00
func ( t * QueueManager ) sendMetadataWithBackoff ( ctx context . Context , metadata [ ] prompb . MetricMetadata , pBuf * proto . Buffer ) error {
2020-11-19 15:23:03 +00:00
// Build the WriteRequest with no samples.
2021-10-29 21:44:40 +00:00
req , _ , err := buildWriteRequest ( nil , metadata , pBuf , nil )
2020-11-19 15:23:03 +00:00
if err != nil {
return err
}
metadataCount := len ( metadata )
attemptStore := func ( try int ) error {
2022-01-25 10:08:04 +00:00
ctx , span := otel . Tracer ( "" ) . Start ( ctx , "Remote Metadata Send Batch" )
defer span . End ( )
span . SetAttributes (
attribute . Int ( "metadata" , metadataCount ) ,
attribute . Int ( "try" , try ) ,
attribute . String ( "remote_name" , t . storeClient . Name ( ) ) ,
attribute . String ( "remote_url" , t . storeClient . Endpoint ( ) ) ,
)
2020-11-19 15:23:03 +00:00
begin := time . Now ( )
err := t . storeClient . Store ( ctx , req )
t . metrics . sentBatchDuration . Observe ( time . Since ( begin ) . Seconds ( ) )
if err != nil {
2022-01-25 10:08:04 +00:00
span . RecordError ( err )
2020-11-19 15:23:03 +00:00
return err
}
return nil
}
retry := func ( ) {
t . metrics . retriedMetadataTotal . Add ( float64 ( len ( metadata ) ) )
}
2021-02-04 13:38:32 +00:00
err = sendWriteRequestWithBackoff ( ctx , t . cfg , t . logger , attemptStore , retry )
2020-11-19 15:23:03 +00:00
if err != nil {
return err
}
t . metrics . metadataTotal . Add ( float64 ( len ( metadata ) ) )
t . metrics . metadataBytesTotal . Add ( float64 ( len ( req ) ) )
return nil
}
2018-09-07 21:26:04 +00:00
// Append queues a sample to be sent to the remote storage. Blocks until all samples are
// enqueued on their shards or a shutdown signal is received.
2019-09-19 09:15:41 +00:00
func ( t * QueueManager ) Append ( samples [ ] record . RefSample ) bool {
2019-06-27 18:48:21 +00:00
outer :
2019-08-12 16:22:02 +00:00
for _ , s := range samples {
2019-09-13 17:23:58 +00:00
t . seriesMtx . Lock ( )
2019-08-12 16:22:02 +00:00
lbls , ok := t . seriesLabels [ s . Ref ]
2019-06-27 18:48:21 +00:00
if ! ok {
2020-04-25 03:39:46 +00:00
t . metrics . droppedSamplesTotal . Inc ( )
2021-05-06 20:53:52 +00:00
t . dataDropped . incr ( 1 )
2019-08-12 16:22:02 +00:00
if _ , ok := t . droppedSeries [ s . Ref ] ; ! ok {
2020-04-11 08:22:18 +00:00
level . Info ( t . logger ) . Log ( "msg" , "Dropped sample for series that was not explicitly dropped via relabelling" , "ref" , s . Ref )
2018-09-07 21:26:04 +00:00
}
2019-09-13 17:23:58 +00:00
t . seriesMtx . Unlock ( )
2018-09-07 21:26:04 +00:00
continue
}
2019-09-13 17:23:58 +00:00
t . seriesMtx . Unlock ( )
2022-03-03 17:40:09 +00:00
// Start with a very small backoff. This should not be t.cfg.MinBackoff
// as it can happen without errors, and we want to pickup work after
// filling a queue/resharding as quickly as possible.
// TODO: Consider using the average duration of a request as the backoff.
backoff := model . Duration ( 5 * time . Millisecond )
2018-09-07 21:26:04 +00:00
for {
select {
case <- t . quit :
return false
default :
}
2021-12-03 14:30:42 +00:00
if t . shards . enqueue ( s . Ref , sampleOrExemplar {
seriesLabels : lbls ,
timestamp : s . T ,
value : s . V ,
isSample : true ,
} ) {
2021-05-06 20:53:52 +00:00
continue outer
}
t . metrics . enqueueRetriesTotal . Inc ( )
time . Sleep ( time . Duration ( backoff ) )
backoff = backoff * 2
2022-03-03 17:40:09 +00:00
// It is reasonable to use t.cfg.MaxBackoff here, as if we have hit
// the full backoff we are likely waiting for external resources.
2021-05-06 20:53:52 +00:00
if backoff > t . cfg . MaxBackoff {
backoff = t . cfg . MaxBackoff
}
}
}
return true
}
2017-05-10 09:44:13 +00:00
2021-05-06 20:53:52 +00:00
func ( t * QueueManager ) AppendExemplars ( exemplars [ ] record . RefExemplar ) bool {
if ! t . sendExemplars {
return true
}
outer :
for _ , e := range exemplars {
t . seriesMtx . Lock ( )
lbls , ok := t . seriesLabels [ e . Ref ]
if ! ok {
t . metrics . droppedExemplarsTotal . Inc ( )
// Track dropped exemplars in the same EWMA for sharding calc.
t . dataDropped . incr ( 1 )
if _ , ok := t . droppedSeries [ e . Ref ] ; ! ok {
level . Info ( t . logger ) . Log ( "msg" , "Dropped exemplar for series that was not explicitly dropped via relabelling" , "ref" , e . Ref )
}
t . seriesMtx . Unlock ( )
continue
}
t . seriesMtx . Unlock ( )
// This will only loop if the queues are being resharded.
backoff := t . cfg . MinBackoff
for {
select {
case <- t . quit :
return false
default :
}
2021-12-03 14:30:42 +00:00
if t . shards . enqueue ( e . Ref , sampleOrExemplar {
seriesLabels : lbls ,
timestamp : e . T ,
value : e . V ,
exemplarLabels : e . Labels ,
} ) {
2018-09-07 21:26:04 +00:00
continue outer
}
2019-01-18 12:48:16 +00:00
2020-04-25 03:39:46 +00:00
t . metrics . enqueueRetriesTotal . Inc ( )
2018-09-07 21:26:04 +00:00
time . Sleep ( time . Duration ( backoff ) )
backoff = backoff * 2
if backoff > t . cfg . MaxBackoff {
backoff = t . cfg . MaxBackoff
}
2017-05-10 09:44:13 +00:00
}
}
2018-09-07 21:26:04 +00:00
return true
2017-05-10 09:44:13 +00:00
}
// Start the queue manager sending samples to the remote storage.
// Does not block.
func ( t * QueueManager ) Start ( ) {
2020-06-26 06:33:52 +00:00
// Register and initialise some metrics.
t . metrics . register ( )
2020-04-25 03:39:46 +00:00
t . metrics . shardCapacity . Set ( float64 ( t . cfg . Capacity ) )
t . metrics . maxNumShards . Set ( float64 ( t . cfg . MaxShards ) )
t . metrics . minNumShards . Set ( float64 ( t . cfg . MinShards ) )
t . metrics . desiredNumShards . Set ( float64 ( t . cfg . MinShards ) )
2020-10-28 11:39:36 +00:00
t . metrics . maxSamplesPerSend . Set ( float64 ( t . cfg . MaxSamplesPerSend ) )
2019-04-23 08:49:17 +00:00
2018-09-07 21:26:04 +00:00
t . shards . start ( t . numShards )
t . watcher . Start ( )
2020-11-19 15:23:03 +00:00
if t . mcfg . Send {
t . metadataWatcher . Start ( )
}
2018-09-07 21:26:04 +00:00
2017-05-10 09:44:13 +00:00
t . wg . Add ( 2 )
go t . updateShardsLoop ( )
go t . reshardLoop ( )
}
// Stop stops sending samples to the remote storage and waits for pending
// sends to complete.
func ( t * QueueManager ) Stop ( ) {
2017-08-11 18:45:52 +00:00
level . Info ( t . logger ) . Log ( "msg" , "Stopping remote storage..." )
2018-09-07 21:26:04 +00:00
defer level . Info ( t . logger ) . Log ( "msg" , "Remote storage stopped." )
2017-05-10 09:44:13 +00:00
close ( t . quit )
2019-04-16 10:25:19 +00:00
t . wg . Wait ( )
2020-11-19 15:23:03 +00:00
// Wait for all QueueManager routines to end before stopping shards, metadata watcher, and WAL watcher. This
2019-04-16 10:25:19 +00:00
// is to ensure we don't end up executing a reshard and shards.stop() at the same time, which
// causes a closed channel panic.
2018-09-07 21:26:04 +00:00
t . shards . stop ( )
t . watcher . Stop ( )
2020-11-19 15:23:03 +00:00
if t . mcfg . Send {
t . metadataWatcher . Stop ( )
}
2019-03-13 10:02:36 +00:00
// On shutdown, release the strings in the labels from the intern pool.
2019-09-13 17:23:58 +00:00
t . seriesMtx . Lock ( )
2019-03-13 10:02:36 +00:00
for _ , labels := range t . seriesLabels {
2020-09-24 18:44:18 +00:00
t . releaseLabels ( labels )
2019-03-13 10:02:36 +00:00
}
2019-09-13 17:23:58 +00:00
t . seriesMtx . Unlock ( )
2020-04-25 03:39:46 +00:00
t . metrics . unregister ( )
2018-09-07 21:26:04 +00:00
}
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
2019-09-19 09:15:41 +00:00
func ( t * QueueManager ) StoreSeries ( series [ ] record . RefSeries , index int ) {
2019-09-13 17:23:58 +00:00
t . seriesMtx . Lock ( )
defer t . seriesMtx . Unlock ( )
2021-07-27 20:21:48 +00:00
t . seriesSegmentMtx . Lock ( )
defer t . seriesSegmentMtx . Unlock ( )
2018-09-07 21:26:04 +00:00
for _ , s := range series {
2021-01-22 15:03:10 +00:00
// Just make sure all the Refs of Series will insert into seriesSegmentIndexes map for tracking.
t . seriesSegmentIndexes [ s . Ref ] = index
2019-03-08 16:29:25 +00:00
ls := processExternalLabels ( s . Labels , t . externalLabels )
2019-08-07 19:39:07 +00:00
lbls := relabel . Process ( ls , t . relabelConfigs ... )
if len ( lbls ) == 0 {
2018-09-07 21:26:04 +00:00
t . droppedSeries [ s . Ref ] = struct { } { }
continue
}
2020-09-24 18:44:18 +00:00
t . internLabels ( lbls )
2019-03-11 23:44:23 +00:00
2019-03-13 10:02:36 +00:00
// We should not ever be replacing a series labels in the map, but just
// in case we do we need to ensure we do not leak the replaced interned
// strings.
2019-06-27 18:48:21 +00:00
if orig , ok := t . seriesLabels [ s . Ref ] ; ok {
2020-09-24 18:44:18 +00:00
t . releaseLabels ( orig )
2019-03-11 23:44:23 +00:00
}
2019-08-07 19:39:07 +00:00
t . seriesLabels [ s . Ref ] = lbls
2018-09-07 21:26:04 +00:00
}
}
2017-05-10 09:44:13 +00:00
2021-11-18 08:26:07 +00:00
// UpdateSeriesSegment updates the segment number held against the series,
// so we can trim older ones in SeriesReset.
2021-07-27 20:21:48 +00:00
func ( t * QueueManager ) UpdateSeriesSegment ( series [ ] record . RefSeries , index int ) {
t . seriesSegmentMtx . Lock ( )
defer t . seriesSegmentMtx . Unlock ( )
for _ , s := range series {
t . seriesSegmentIndexes [ s . Ref ] = index
}
}
2018-09-07 21:26:04 +00:00
// SeriesReset is used when reading a checkpoint. WAL Watcher should have
// stored series records with the checkpoints index number, so we can now
// delete any ref ID's lower than that # from the two maps.
func ( t * QueueManager ) SeriesReset ( index int ) {
2019-09-13 17:23:58 +00:00
t . seriesMtx . Lock ( )
defer t . seriesMtx . Unlock ( )
2021-07-27 20:21:48 +00:00
t . seriesSegmentMtx . Lock ( )
defer t . seriesSegmentMtx . Unlock ( )
2018-09-07 21:26:04 +00:00
// Check for series that are in segments older than the checkpoint
// that were not also present in the checkpoint.
for k , v := range t . seriesSegmentIndexes {
if v < index {
delete ( t . seriesSegmentIndexes , k )
2020-09-24 18:44:18 +00:00
t . releaseLabels ( t . seriesLabels [ k ] )
2019-03-11 23:44:23 +00:00
delete ( t . seriesLabels , k )
2019-09-13 17:23:58 +00:00
delete ( t . droppedSeries , k )
2018-09-07 21:26:04 +00:00
}
}
}
2017-08-11 18:45:52 +00:00
2020-03-31 03:39:29 +00:00
// SetClient updates the client used by a queue. Used when only client specific
// fields are updated to avoid restarting the queue.
2020-06-24 13:41:52 +00:00
func ( t * QueueManager ) SetClient ( c WriteClient ) {
2020-03-31 03:39:29 +00:00
t . clientMtx . Lock ( )
t . storeClient = c
t . clientMtx . Unlock ( )
}
2020-06-24 13:41:52 +00:00
func ( t * QueueManager ) client ( ) WriteClient {
2020-03-31 03:39:29 +00:00
t . clientMtx . RLock ( )
defer t . clientMtx . RUnlock ( )
return t . storeClient
}
2020-09-24 18:44:18 +00:00
func ( t * QueueManager ) internLabels ( lbls labels . Labels ) {
2019-08-07 19:39:07 +00:00
for i , l := range lbls {
2020-09-24 18:44:18 +00:00
lbls [ i ] . Name = t . interner . intern ( l . Name )
lbls [ i ] . Value = t . interner . intern ( l . Value )
2019-08-07 19:39:07 +00:00
}
}
2020-09-24 18:44:18 +00:00
func ( t * QueueManager ) releaseLabels ( ls labels . Labels ) {
2019-03-11 23:44:23 +00:00
for _ , l := range ls {
2020-09-24 18:44:18 +00:00
t . interner . release ( l . Name )
t . interner . release ( l . Value )
2019-03-11 23:44:23 +00:00
}
}
2019-03-13 10:02:36 +00:00
// processExternalLabels merges externalLabels into ls. If ls contains
2019-03-08 16:29:25 +00:00
// a label in externalLabels, the value in ls wins.
2021-10-22 08:06:44 +00:00
func processExternalLabels ( ls , externalLabels labels . Labels ) labels . Labels {
2019-03-08 16:29:25 +00:00
i , j , result := 0 , 0 , make ( labels . Labels , 0 , len ( ls ) + len ( externalLabels ) )
for i < len ( ls ) && j < len ( externalLabels ) {
if ls [ i ] . Name < externalLabels [ j ] . Name {
result = append ( result , labels . Label {
Name : ls [ i ] . Name ,
Value : ls [ i ] . Value ,
} )
i ++
} else if ls [ i ] . Name > externalLabels [ j ] . Name {
result = append ( result , externalLabels [ j ] )
j ++
} else {
result = append ( result , labels . Label {
Name : ls [ i ] . Name ,
Value : ls [ i ] . Value ,
} )
i ++
j ++
2018-09-07 21:26:04 +00:00
}
}
2021-04-30 16:37:07 +00:00
return append ( append ( result , ls [ i : ] ... ) , externalLabels [ j : ] ... )
2017-05-10 09:44:13 +00:00
}
func ( t * QueueManager ) updateShardsLoop ( ) {
defer t . wg . Done ( )
2017-10-09 16:36:20 +00:00
ticker := time . NewTicker ( shardUpdateDuration )
defer ticker . Stop ( )
2017-05-10 09:44:13 +00:00
for {
select {
2017-10-09 16:36:20 +00:00
case <- ticker . C :
2019-10-21 21:54:25 +00:00
desiredShards := t . calculateDesiredShards ( )
2020-04-20 22:20:39 +00:00
if ! t . shouldReshard ( desiredShards ) {
2019-10-21 21:54:25 +00:00
continue
}
// Resharding can take some time, and we want this loop
// to stay close to shardUpdateDuration.
select {
case t . reshardChan <- desiredShards :
2019-11-26 13:22:56 +00:00
level . Info ( t . logger ) . Log ( "msg" , "Remote storage resharding" , "from" , t . numShards , "to" , desiredShards )
2019-10-21 21:54:25 +00:00
t . numShards = desiredShards
default :
level . Info ( t . logger ) . Log ( "msg" , "Currently resharding, skipping." )
}
2017-05-10 09:44:13 +00:00
case <- t . quit :
return
}
}
}
2020-04-20 22:20:39 +00:00
// shouldReshard returns if resharding should occur
func ( t * QueueManager ) shouldReshard ( desiredShards int ) bool {
if desiredShards == t . numShards {
return false
}
// We shouldn't reshard if Prometheus hasn't been able to send to the
// remote endpoint successfully within some period of time.
minSendTimestamp := time . Now ( ) . Add ( - 2 * time . Duration ( t . cfg . BatchSendDeadline ) ) . Unix ( )
2020-07-30 07:45:42 +00:00
lsts := t . lastSendTimestamp . Load ( )
2020-04-20 22:20:39 +00:00
if lsts < minSendTimestamp {
level . Warn ( t . logger ) . Log ( "msg" , "Skipping resharding, last successful send was beyond threshold" , "lastSendTimestamp" , lsts , "minSendTimestamp" , minSendTimestamp )
return false
}
return true
}
2019-10-21 21:54:25 +00:00
// calculateDesiredShards returns the number of desired shards, which will be
// the current QueueManager.numShards if resharding should not occur for reasons
// outlined in this functions implementation. It is up to the caller to reshard, or not,
// based on the return value.
func ( t * QueueManager ) calculateDesiredShards ( ) int {
2021-05-06 20:53:52 +00:00
t . dataOut . tick ( )
t . dataDropped . tick ( )
t . dataOutDuration . tick ( )
2017-05-10 09:44:13 +00:00
// We use the number of incoming samples as a prediction of how much work we
// will need to do next iteration. We add to this any pending samples
// (received - send) so we can catch up with any backlog. We use the average
// outgoing batch latency to work out how many shards we need.
var (
2021-05-06 20:53:52 +00:00
dataInRate = t . dataIn . rate ( )
dataOutRate = t . dataOut . rate ( )
dataKeptRatio = dataOutRate / ( t . dataDropped . rate ( ) + dataOutRate )
dataOutDuration = t . dataOutDuration . rate ( ) / float64 ( time . Second )
dataPendingRate = dataInRate * dataKeptRatio - dataOutRate
highestSent = t . metrics . highestSentTimestamp . Get ( )
highestRecv = t . highestRecvTimestamp . Get ( )
delay = highestRecv - highestSent
dataPending = delay * dataInRate * dataKeptRatio
2017-05-10 09:44:13 +00:00
)
2021-05-06 20:53:52 +00:00
if dataOutRate <= 0 {
2019-10-21 21:54:25 +00:00
return t . numShards
2017-05-10 09:44:13 +00:00
}
var (
2021-08-29 17:11:13 +00:00
// When behind we will try to catch up on 5% of samples per second.
backlogCatchup = 0.05 * dataPending
// Calculate Time to send one sample, averaged across all sends done this tick.
2021-05-06 20:53:52 +00:00
timePerSample = dataOutDuration / dataOutRate
2021-08-29 17:11:13 +00:00
desiredShards = timePerSample * ( dataInRate * dataKeptRatio + backlogCatchup )
2017-05-10 09:44:13 +00:00
)
2020-04-25 03:39:46 +00:00
t . metrics . desiredNumShards . Set ( desiredShards )
2019-08-10 15:24:58 +00:00
level . Debug ( t . logger ) . Log ( "msg" , "QueueManager.calculateDesiredShards" ,
2021-05-06 20:53:52 +00:00
"dataInRate" , dataInRate ,
"dataOutRate" , dataOutRate ,
"dataKeptRatio" , dataKeptRatio ,
"dataPendingRate" , dataPendingRate ,
"dataPending" , dataPending ,
"dataOutDuration" , dataOutDuration ,
2019-03-01 19:04:26 +00:00
"timePerSample" , timePerSample ,
"desiredShards" , desiredShards ,
"highestSent" , highestSent ,
2019-08-13 09:10:21 +00:00
"highestRecv" , highestRecv ,
)
2017-05-10 09:44:13 +00:00
// Changes in the number of shards must be greater than shardToleranceFraction.
var (
lowerBound = float64 ( t . numShards ) * ( 1. - shardToleranceFraction )
upperBound = float64 ( t . numShards ) * ( 1. + shardToleranceFraction )
)
2017-08-11 18:45:52 +00:00
level . Debug ( t . logger ) . Log ( "msg" , "QueueManager.updateShardsLoop" ,
"lowerBound" , lowerBound , "desiredShards" , desiredShards , "upperBound" , upperBound )
2021-08-30 12:39:16 +00:00
desiredShards = math . Ceil ( desiredShards ) // Round up to be on the safe side.
2017-05-10 09:44:13 +00:00
if lowerBound <= desiredShards && desiredShards <= upperBound {
2019-10-21 21:54:25 +00:00
return t . numShards
2017-05-10 09:44:13 +00:00
}
2021-08-30 12:39:16 +00:00
numShards := int ( desiredShards )
2020-01-02 17:30:56 +00:00
// Do not downshard if we are more than ten seconds back.
if numShards < t . numShards && delay > 10.0 {
level . Debug ( t . logger ) . Log ( "msg" , "Not downsharding due to being too far behind" )
return t . numShards
}
2017-05-10 09:44:13 +00:00
if numShards > t . cfg . MaxShards {
numShards = t . cfg . MaxShards
2018-12-04 17:32:14 +00:00
} else if numShards < t . cfg . MinShards {
numShards = t . cfg . MinShards
2017-05-10 09:44:13 +00:00
}
2019-10-21 21:54:25 +00:00
return numShards
2017-05-10 09:44:13 +00:00
}
func ( t * QueueManager ) reshardLoop ( ) {
defer t . wg . Done ( )
for {
select {
case numShards := <- t . reshardChan :
2018-09-07 21:26:04 +00:00
// We start the newShards after we have stopped (the therefore completely
// flushed) the oldShards, to guarantee we only every deliver samples in
// order.
t . shards . stop ( )
t . shards . start ( numShards )
2017-05-10 09:44:13 +00:00
case <- t . quit :
return
}
}
}
2018-09-07 21:26:04 +00:00
func ( t * QueueManager ) newShards ( ) * shards {
s := & shards {
qm : t ,
done : make ( chan struct { } ) ,
}
return s
}
2017-05-10 09:44:13 +00:00
type shards struct {
2022-03-03 17:40:09 +00:00
mtx sync . RWMutex // With the WAL, this is never actually contended.
2018-09-07 21:26:04 +00:00
qm * QueueManager
2021-11-19 21:11:32 +00:00
queues [ ] * queue
2021-05-06 20:53:52 +00:00
// So we can accurately track how many of each are lost during shard shutdowns.
enqueuedSamples atomic . Int64
enqueuedExemplars atomic . Int64
2018-09-07 21:26:04 +00:00
// Emulate a wait group with a channel and an atomic int, as you
// cannot select on a wait group.
2018-02-01 13:20:38 +00:00
done chan struct { }
2020-07-30 07:45:42 +00:00
running atomic . Int32
2018-09-07 21:26:04 +00:00
// Soft shutdown context will prevent new enqueues and deadlocks.
softShutdown chan struct { }
// Hard shutdown context is used to terminate outgoing HTTP connections
// after giving them a chance to terminate.
2021-05-06 20:53:52 +00:00
hardShutdown context . CancelFunc
samplesDroppedOnHardShutdown atomic . Uint32
exemplarsDroppedOnHardShutdown atomic . Uint32
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
// start the shards; must be called before any call to enqueue.
func ( s * shards ) start ( n int ) {
s . mtx . Lock ( )
defer s . mtx . Unlock ( )
2020-06-25 20:48:30 +00:00
s . qm . metrics . pendingSamples . Set ( 0 )
s . qm . metrics . numShards . Set ( float64 ( n ) )
2021-11-19 21:11:32 +00:00
newQueues := make ( [ ] * queue , n )
2018-09-07 21:26:04 +00:00
for i := 0 ; i < n ; i ++ {
2021-11-19 21:11:32 +00:00
newQueues [ i ] = newQueue ( s . qm . cfg . MaxSamplesPerSend , s . qm . cfg . Capacity )
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
s . queues = newQueues
var hardShutdownCtx context . Context
hardShutdownCtx , s . hardShutdown = context . WithCancel ( context . Background ( ) )
s . softShutdown = make ( chan struct { } )
2020-07-30 07:45:42 +00:00
s . running . Store ( int32 ( n ) )
2018-09-07 21:26:04 +00:00
s . done = make ( chan struct { } )
2022-02-11 14:07:41 +00:00
s . enqueuedSamples . Store ( 0 )
s . enqueuedExemplars . Store ( 0 )
2021-05-06 20:53:52 +00:00
s . samplesDroppedOnHardShutdown . Store ( 0 )
s . exemplarsDroppedOnHardShutdown . Store ( 0 )
2018-09-07 21:26:04 +00:00
for i := 0 ; i < n ; i ++ {
go s . runShard ( hardShutdownCtx , i , newQueues [ i ] )
2017-05-10 09:44:13 +00:00
}
}
2018-09-07 21:26:04 +00:00
// stop the shards; subsequent call to enqueue will return false.
func ( s * shards ) stop ( ) {
// Attempt a clean shutdown, but only wait flushDeadline for all the shards
2019-03-03 11:35:29 +00:00
// to cleanly exit. As we're doing RPCs, enqueue can block indefinitely.
2018-09-07 21:26:04 +00:00
// We must be able so call stop concurrently, hence we can only take the
// RLock here.
s . mtx . RLock ( )
close ( s . softShutdown )
s . mtx . RUnlock ( )
// Enqueue should now be unblocked, so we can take the write lock. This
// also ensures we don't race with writes to the queues, and get a panic:
// send on closed channel.
s . mtx . Lock ( )
defer s . mtx . Unlock ( )
for _ , queue := range s . queues {
2021-11-19 21:11:32 +00:00
go queue . FlushAndShutdown ( s . done )
2017-05-10 09:44:13 +00:00
}
2018-01-31 15:41:48 +00:00
select {
2018-02-01 13:20:38 +00:00
case <- s . done :
2018-05-29 08:51:29 +00:00
return
2018-09-07 21:26:04 +00:00
case <- time . After ( s . qm . flushDeadline ) :
2018-01-31 15:41:48 +00:00
}
2018-05-29 08:51:29 +00:00
2018-05-29 10:35:43 +00:00
// Force an unclean shutdown.
2018-09-07 21:26:04 +00:00
s . hardShutdown ( )
2018-05-29 08:51:29 +00:00
<- s . done
2021-05-06 20:53:52 +00:00
if dropped := s . samplesDroppedOnHardShutdown . Load ( ) ; dropped > 0 {
2020-06-25 20:48:30 +00:00
level . Error ( s . qm . logger ) . Log ( "msg" , "Failed to flush all samples on shutdown" , "count" , dropped )
}
2021-05-06 20:53:52 +00:00
if dropped := s . exemplarsDroppedOnHardShutdown . Load ( ) ; dropped > 0 {
level . Error ( s . qm . logger ) . Log ( "msg" , "Failed to flush all exemplars on shutdown" , "count" , dropped )
}
2017-05-10 09:44:13 +00:00
}
2022-03-03 17:40:09 +00:00
// enqueue data (sample or exemplar). If the shard is full, shutting down, or
// resharding, it will return false; in this case, you should back off and
// retry. A shard is full when its configured capacity has been reached,
// specifically, when s.queues[shard] has filled its batchQueue channel and the
// partial batch has also been filled.
2021-12-03 14:30:42 +00:00
func ( s * shards ) enqueue ( ref chunks . HeadSeriesRef , data sampleOrExemplar ) bool {
2018-09-07 21:26:04 +00:00
s . mtx . RLock ( )
defer s . mtx . RUnlock ( )
2017-05-10 09:44:13 +00:00
2018-09-07 21:26:04 +00:00
shard := uint64 ( ref ) % uint64 ( len ( s . queues ) )
2017-05-10 09:44:13 +00:00
select {
2018-09-07 21:26:04 +00:00
case <- s . softShutdown :
return false
2021-11-19 21:11:32 +00:00
default :
2022-03-03 17:40:09 +00:00
appended := s . queues [ shard ] . Append ( data )
2021-11-19 21:11:32 +00:00
if ! appended {
return false
}
2021-12-09 14:40:44 +00:00
if data . isSample {
2021-05-06 20:53:52 +00:00
s . qm . metrics . pendingSamples . Inc ( )
s . enqueuedSamples . Inc ( )
2021-12-09 14:40:44 +00:00
} else {
2021-05-06 20:53:52 +00:00
s . qm . metrics . pendingExemplars . Inc ( )
s . enqueuedExemplars . Inc ( )
}
2017-05-10 09:44:13 +00:00
return true
}
}
2021-11-19 21:11:32 +00:00
type queue struct {
2022-03-03 17:40:09 +00:00
// batchMtx covers operations appending to or publishing the partial batch.
2022-02-11 14:07:41 +00:00
batchMtx sync . Mutex
2021-12-03 14:30:42 +00:00
batch [ ] sampleOrExemplar
batchQueue chan [ ] sampleOrExemplar
2021-11-19 21:11:32 +00:00
// Since we know there are a limited number of batches out, using a stack
// is easy and safe so a sync.Pool is not necessary.
2022-02-11 14:07:41 +00:00
// poolMtx covers adding and removing batches from the batchPool.
poolMtx sync . Mutex
2021-12-03 14:30:42 +00:00
batchPool [ ] [ ] sampleOrExemplar
2021-11-19 21:11:32 +00:00
}
2021-12-03 14:30:42 +00:00
type sampleOrExemplar struct {
seriesLabels labels . Labels
value float64
timestamp int64
exemplarLabels labels . Labels
isSample bool
}
2021-11-19 21:11:32 +00:00
func newQueue ( batchSize , capacity int ) * queue {
batches := capacity / batchSize
2022-03-03 17:40:09 +00:00
// Always create an unbuffered channel even if capacity is configured to be
// less than max_samples_per_send.
if batches == 0 {
batches = 1
}
2021-11-19 21:11:32 +00:00
return & queue {
2021-12-03 14:30:42 +00:00
batch : make ( [ ] sampleOrExemplar , 0 , batchSize ) ,
batchQueue : make ( chan [ ] sampleOrExemplar , batches ) ,
2021-11-19 21:11:32 +00:00
// batchPool should have capacity for everything in the channel + 1 for
// the batch being processed.
2021-12-03 14:30:42 +00:00
batchPool : make ( [ ] [ ] sampleOrExemplar , 0 , batches + 1 ) ,
2021-11-19 21:11:32 +00:00
}
}
2022-03-03 17:40:09 +00:00
// Append the sampleOrExemplar to the buffered batch. Returns false if it
// cannot be added and must be retried.
func ( q * queue ) Append ( datum sampleOrExemplar ) bool {
q . batchMtx . Lock ( )
defer q . batchMtx . Unlock ( )
2021-11-19 21:11:32 +00:00
q . batch = append ( q . batch , datum )
if len ( q . batch ) == cap ( q . batch ) {
select {
case q . batchQueue <- q . batch :
q . batch = q . newBatch ( cap ( q . batch ) )
return true
2022-03-03 17:40:09 +00:00
default :
2021-11-19 21:11:32 +00:00
// Remove the sample we just appended. It will get retried.
q . batch = q . batch [ : len ( q . batch ) - 1 ]
return false
}
}
return true
}
2021-12-03 14:30:42 +00:00
func ( q * queue ) Chan ( ) <- chan [ ] sampleOrExemplar {
2021-11-19 21:11:32 +00:00
return q . batchQueue
}
2022-03-03 17:40:09 +00:00
// Batch returns the current batch and allocates a new batch.
2021-12-03 14:30:42 +00:00
func ( q * queue ) Batch ( ) [ ] sampleOrExemplar {
2022-02-11 14:07:41 +00:00
q . batchMtx . Lock ( )
defer q . batchMtx . Unlock ( )
2022-03-03 17:40:09 +00:00
select {
case batch := <- q . batchQueue :
return batch
default :
batch := q . batch
q . batch = q . newBatch ( cap ( batch ) )
return batch
}
2021-11-19 21:11:32 +00:00
}
// ReturnForReuse adds the batch buffer back to the internal pool.
2021-12-03 14:30:42 +00:00
func ( q * queue ) ReturnForReuse ( batch [ ] sampleOrExemplar ) {
2022-02-11 14:07:41 +00:00
q . poolMtx . Lock ( )
defer q . poolMtx . Unlock ( )
2021-11-19 21:11:32 +00:00
if len ( q . batchPool ) < cap ( q . batchPool ) {
q . batchPool = append ( q . batchPool , batch [ : 0 ] )
}
}
// FlushAndShutdown stops the queue and flushes any samples. No appends can be
// made after this is called.
func ( q * queue ) FlushAndShutdown ( done <- chan struct { } ) {
2022-02-11 14:07:41 +00:00
q . batchMtx . Lock ( )
defer q . batchMtx . Unlock ( )
2021-11-19 21:11:32 +00:00
if len ( q . batch ) > 0 {
select {
case q . batchQueue <- q . batch :
case <- done :
// The shard has been hard shut down, so no more samples can be
// sent. Drop everything left in the queue.
}
}
q . batch = nil
close ( q . batchQueue )
}
2021-12-03 14:30:42 +00:00
func ( q * queue ) newBatch ( capacity int ) [ ] sampleOrExemplar {
2022-02-11 14:07:41 +00:00
q . poolMtx . Lock ( )
defer q . poolMtx . Unlock ( )
2021-11-19 21:11:32 +00:00
batches := len ( q . batchPool )
if batches > 0 {
batch := q . batchPool [ batches - 1 ]
q . batchPool = q . batchPool [ : batches - 1 ]
return batch
}
2021-12-03 14:30:42 +00:00
return make ( [ ] sampleOrExemplar , 0 , capacity )
2021-11-19 21:11:32 +00:00
}
func ( s * shards ) runShard ( ctx context . Context , shardID int , queue * queue ) {
2018-02-01 13:20:38 +00:00
defer func ( ) {
2020-07-30 07:45:42 +00:00
if s . running . Dec ( ) == 0 {
2018-02-01 13:20:38 +00:00
close ( s . done )
}
} ( )
2019-08-12 16:22:02 +00:00
shardNum := strconv . Itoa ( shardID )
2017-05-10 09:44:13 +00:00
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
2021-05-06 20:53:52 +00:00
// If we have fewer samples than that, flush them out after a deadline anyways.
2019-08-12 16:22:02 +00:00
var (
2021-11-19 21:11:32 +00:00
max = s . qm . cfg . MaxSamplesPerSend
2021-05-06 20:53:52 +00:00
2021-10-29 21:44:40 +00:00
pBuf = proto . NewBuffer ( nil )
buf [ ] byte
2019-08-12 16:22:02 +00:00
)
2021-05-06 20:53:52 +00:00
if s . qm . sendExemplars {
2021-08-09 21:20:53 +00:00
max += int ( float64 ( max ) * 0.1 )
2021-05-06 20:53:52 +00:00
}
2021-11-19 21:11:32 +00:00
batchQueue := queue . Chan ( )
2021-10-22 08:06:44 +00:00
pendingData := make ( [ ] prompb . TimeSeries , max )
2021-08-09 21:20:53 +00:00
for i := range pendingData {
pendingData [ i ] . Samples = [ ] prompb . Sample { { } }
if s . qm . sendExemplars {
pendingData [ i ] . Exemplars = [ ] prompb . Exemplar { { } }
}
}
2019-06-27 18:48:21 +00:00
2018-08-24 14:55:21 +00:00
timer := time . NewTimer ( time . Duration ( s . qm . cfg . BatchSendDeadline ) )
2018-03-12 14:27:48 +00:00
stop := func ( ) {
2018-03-09 12:00:26 +00:00
if ! timer . Stop ( ) {
select {
case <- timer . C :
default :
}
}
2018-03-12 14:27:48 +00:00
}
defer stop ( )
2018-01-24 12:36:29 +00:00
2017-05-10 09:44:13 +00:00
for {
select {
2018-09-07 21:26:04 +00:00
case <- ctx . Done ( ) :
2020-06-25 20:48:30 +00:00
// In this case we drop all samples in the buffer and the queue.
// Remove them from pending and mark them as failed.
2021-11-19 21:11:32 +00:00
droppedSamples := int ( s . enqueuedSamples . Load ( ) )
droppedExemplars := int ( s . enqueuedExemplars . Load ( ) )
2020-06-25 20:48:30 +00:00
s . qm . metrics . pendingSamples . Sub ( float64 ( droppedSamples ) )
2021-05-06 20:53:52 +00:00
s . qm . metrics . pendingExemplars . Sub ( float64 ( droppedExemplars ) )
2020-06-25 20:48:30 +00:00
s . qm . metrics . failedSamplesTotal . Add ( float64 ( droppedSamples ) )
2021-05-06 20:53:52 +00:00
s . qm . metrics . failedExemplarsTotal . Add ( float64 ( droppedExemplars ) )
s . samplesDroppedOnHardShutdown . Add ( uint32 ( droppedSamples ) )
s . exemplarsDroppedOnHardShutdown . Add ( uint32 ( droppedExemplars ) )
2018-05-29 08:51:29 +00:00
return
2021-11-19 21:11:32 +00:00
case batch , ok := <- batchQueue :
2017-05-10 09:44:13 +00:00
if ! ok {
return
}
2021-11-19 21:11:32 +00:00
nPendingSamples , nPendingExemplars := s . populateTimeSeries ( batch , pendingData )
queue . ReturnForReuse ( batch )
n := nPendingSamples + nPendingExemplars
s . sendSamples ( ctx , pendingData [ : n ] , nPendingSamples , nPendingExemplars , pBuf , & buf )
stop ( )
timer . Reset ( time . Duration ( s . qm . cfg . BatchSendDeadline ) )
2018-03-09 12:00:26 +00:00
2018-01-24 12:36:29 +00:00
case <- timer . C :
2022-03-03 17:40:09 +00:00
batch := queue . Batch ( )
2021-11-19 21:11:32 +00:00
if len ( batch ) > 0 {
nPendingSamples , nPendingExemplars := s . populateTimeSeries ( batch , pendingData )
n := nPendingSamples + nPendingExemplars
2021-05-06 20:53:52 +00:00
level . Debug ( s . qm . logger ) . Log ( "msg" , "runShard timer ticked, sending buffered data" , "samples" , nPendingSamples , "exemplars" , nPendingExemplars , "shard" , shardNum )
2021-11-19 21:11:32 +00:00
s . sendSamples ( ctx , pendingData [ : n ] , nPendingSamples , nPendingExemplars , pBuf , & buf )
2017-05-10 09:44:13 +00:00
}
2021-11-19 21:11:32 +00:00
queue . ReturnForReuse ( batch )
2018-08-24 14:55:21 +00:00
timer . Reset ( time . Duration ( s . qm . cfg . BatchSendDeadline ) )
2017-05-10 09:44:13 +00:00
}
}
}
2021-12-03 14:30:42 +00:00
func ( s * shards ) populateTimeSeries ( batch [ ] sampleOrExemplar , pendingData [ ] prompb . TimeSeries ) ( int , int ) {
2021-11-19 21:11:32 +00:00
var nPendingSamples , nPendingExemplars int
2021-12-03 14:30:42 +00:00
for nPending , d := range batch {
2021-11-19 21:11:32 +00:00
pendingData [ nPending ] . Samples = pendingData [ nPending ] . Samples [ : 0 ]
if s . qm . sendExemplars {
pendingData [ nPending ] . Exemplars = pendingData [ nPending ] . Exemplars [ : 0 ]
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
2021-12-09 14:40:44 +00:00
if d . isSample {
2021-11-19 21:11:32 +00:00
pendingData [ nPending ] . Labels = labelsToLabelsProto ( d . seriesLabels , pendingData [ nPending ] . Labels )
2021-12-03 14:30:42 +00:00
pendingData [ nPending ] . Samples = append ( pendingData [ nPending ] . Samples , prompb . Sample {
Value : d . value ,
Timestamp : d . timestamp ,
} )
2021-11-19 21:11:32 +00:00
nPendingSamples ++
2021-12-09 14:40:44 +00:00
} else {
2021-11-19 21:11:32 +00:00
pendingData [ nPending ] . Labels = labelsToLabelsProto ( d . seriesLabels , pendingData [ nPending ] . Labels )
2021-12-03 14:30:42 +00:00
pendingData [ nPending ] . Exemplars = append ( pendingData [ nPending ] . Exemplars , prompb . Exemplar {
Labels : labelsToLabelsProto ( d . exemplarLabels , nil ) ,
Value : d . value ,
Timestamp : d . timestamp ,
} )
2021-11-19 21:11:32 +00:00
nPendingExemplars ++
}
}
return nPendingSamples , nPendingExemplars
}
2021-10-22 08:06:44 +00:00
func ( s * shards ) sendSamples ( ctx context . Context , samples [ ] prompb . TimeSeries , sampleCount , exemplarCount int , pBuf * proto . Buffer , buf * [ ] byte ) {
2017-05-10 09:44:13 +00:00
begin := time . Now ( )
2021-10-29 21:44:40 +00:00
err := s . sendSamplesWithBackoff ( ctx , samples , sampleCount , exemplarCount , pBuf , buf )
2019-02-12 14:58:25 +00:00
if err != nil {
2021-05-06 20:53:52 +00:00
level . Error ( s . qm . logger ) . Log ( "msg" , "non-recoverable error" , "count" , sampleCount , "exemplarCount" , exemplarCount , "err" , err )
s . qm . metrics . failedSamplesTotal . Add ( float64 ( sampleCount ) )
s . qm . metrics . failedExemplarsTotal . Add ( float64 ( exemplarCount ) )
2018-09-07 21:26:04 +00:00
}
2017-05-10 09:44:13 +00:00
2018-04-08 09:51:54 +00:00
// These counters are used to calculate the dynamic sharding, and as such
2017-05-10 09:44:13 +00:00
// should be maintained irrespective of success or failure.
2021-05-06 20:53:52 +00:00
s . qm . dataOut . incr ( int64 ( len ( samples ) ) )
s . qm . dataOutDuration . incr ( int64 ( time . Since ( begin ) ) )
2020-07-30 07:45:42 +00:00
s . qm . lastSendTimestamp . Store ( time . Now ( ) . Unix ( ) )
2021-11-30 17:53:04 +00:00
// Pending samples/exemplars also should be subtracted as an error means
// they will not be retried.
s . qm . metrics . pendingSamples . Sub ( float64 ( sampleCount ) )
s . qm . metrics . pendingExemplars . Sub ( float64 ( exemplarCount ) )
s . enqueuedSamples . Sub ( int64 ( sampleCount ) )
s . enqueuedExemplars . Sub ( int64 ( exemplarCount ) )
2017-05-10 09:44:13 +00:00
}
// sendSamples to the remote storage with backoff for recoverable errors.
2021-10-22 08:06:44 +00:00
func ( s * shards ) sendSamplesWithBackoff ( ctx context . Context , samples [ ] prompb . TimeSeries , sampleCount , exemplarCount int , pBuf * proto . Buffer , buf * [ ] byte ) error {
2020-11-19 15:23:03 +00:00
// Build the WriteRequest with no metadata.
2021-10-29 21:44:40 +00:00
req , highest , err := buildWriteRequest ( samples , nil , pBuf , * buf )
2018-09-07 21:26:04 +00:00
if err != nil {
2019-03-01 19:04:26 +00:00
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
2018-09-07 21:26:04 +00:00
return err
}
2019-03-01 19:04:26 +00:00
2021-11-03 09:10:31 +00:00
reqSize := len ( req )
2020-06-01 15:21:13 +00:00
* buf = req
// An anonymous function allows us to defer the completion of our per-try spans
// without causing a memory leak, and it has the nice effect of not propagating any
// parameters for sendSamplesWithBackoff/3.
2020-11-19 15:23:03 +00:00
attemptStore := func ( try int ) error {
2022-01-25 10:08:04 +00:00
ctx , span := otel . Tracer ( "" ) . Start ( ctx , "Remote Send Batch" )
defer span . End ( )
span . SetAttributes (
attribute . Int ( "request_size" , reqSize ) ,
attribute . Int ( "samples" , sampleCount ) ,
attribute . Int ( "try" , try ) ,
attribute . String ( "remote_name" , s . qm . storeClient . Name ( ) ) ,
attribute . String ( "remote_url" , s . qm . storeClient . Endpoint ( ) ) ,
)
2020-06-01 15:21:13 +00:00
2021-05-06 20:53:52 +00:00
if exemplarCount > 0 {
2022-01-25 10:08:04 +00:00
span . SetAttributes ( attribute . Int ( "exemplars" , exemplarCount ) )
2021-05-06 20:53:52 +00:00
}
2020-06-01 15:21:13 +00:00
begin := time . Now ( )
2020-11-19 15:23:03 +00:00
s . qm . metrics . samplesTotal . Add ( float64 ( sampleCount ) )
2021-05-06 20:53:52 +00:00
s . qm . metrics . exemplarsTotal . Add ( float64 ( exemplarCount ) )
2020-06-01 15:21:13 +00:00
err := s . qm . client ( ) . Store ( ctx , * buf )
s . qm . metrics . sentBatchDuration . Observe ( time . Since ( begin ) . Seconds ( ) )
if err != nil {
2022-01-25 10:08:04 +00:00
span . RecordError ( err )
2020-06-01 15:21:13 +00:00
return err
}
return nil
}
2020-11-19 15:23:03 +00:00
onRetry := func ( ) {
s . qm . metrics . retriedSamplesTotal . Add ( float64 ( sampleCount ) )
2021-05-06 20:53:52 +00:00
s . qm . metrics . retriedExemplarsTotal . Add ( float64 ( exemplarCount ) )
2020-11-19 15:23:03 +00:00
}
2021-02-04 13:38:32 +00:00
err = sendWriteRequestWithBackoff ( ctx , s . qm . cfg , s . qm . logger , attemptStore , onRetry )
2022-01-04 09:46:52 +00:00
if errors . Is ( err , context . Canceled ) {
// When there is resharding, we cancel the context for this queue, which means the data is not sent.
// So we exit early to not update the metrics.
return err
}
2022-01-03 10:13:48 +00:00
2021-05-06 20:53:52 +00:00
s . qm . metrics . sentBytesTotal . Add ( float64 ( reqSize ) )
2020-11-19 15:23:03 +00:00
s . qm . metrics . highestSentTimestamp . Set ( float64 ( highest / 1000 ) )
2022-01-03 10:13:48 +00:00
return err
2020-11-19 15:23:03 +00:00
}
2021-02-04 13:38:32 +00:00
func sendWriteRequestWithBackoff ( ctx context . Context , cfg config . QueueConfig , l log . Logger , attempt func ( int ) error , onRetry func ( ) ) error {
2020-11-19 15:23:03 +00:00
backoff := cfg . MinBackoff
2021-02-10 22:25:37 +00:00
sleepDuration := model . Duration ( 0 )
2020-11-19 15:23:03 +00:00
try := 0
2018-09-07 21:26:04 +00:00
for {
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
}
2017-05-10 09:44:13 +00:00
2020-11-19 15:23:03 +00:00
err := attempt ( try )
2018-09-07 21:26:04 +00:00
2020-11-19 15:23:03 +00:00
if err == nil {
return nil
}
2017-05-10 09:44:13 +00:00
2020-11-19 15:23:03 +00:00
// If the error is unrecoverable, we should not retry.
2021-02-10 22:25:37 +00:00
backoffErr , ok := err . ( RecoverableError )
if ! ok {
2020-11-19 15:23:03 +00:00
return err
}
2020-06-01 15:21:13 +00:00
2021-02-10 22:25:37 +00:00
sleepDuration = backoff
if backoffErr . retryAfter > 0 {
sleepDuration = backoffErr . retryAfter
level . Info ( l ) . Log ( "msg" , "Retrying after duration specified by Retry-After header" , "duration" , sleepDuration )
} else if backoffErr . retryAfter < 0 {
level . Debug ( l ) . Log ( "msg" , "retry-after cannot be in past, retrying using default backoff mechanism" )
}
select {
case <- ctx . Done ( ) :
case <- time . After ( time . Duration ( sleepDuration ) ) :
}
2020-11-19 15:23:03 +00:00
// If we make it this far, we've encountered a recoverable error and will retry.
onRetry ( )
2021-01-27 16:38:34 +00:00
level . Warn ( l ) . Log ( "msg" , "Failed to send batch, retrying" , "err" , err )
2018-09-07 21:26:04 +00:00
2021-02-10 22:25:37 +00:00
backoff = sleepDuration * 2
2020-11-19 15:23:03 +00:00
if backoff > cfg . MaxBackoff {
backoff = cfg . MaxBackoff
2017-05-10 09:44:13 +00:00
}
2020-06-01 15:21:13 +00:00
2020-11-19 15:23:03 +00:00
try ++
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
}
2021-10-29 21:44:40 +00:00
func buildWriteRequest ( samples [ ] prompb . TimeSeries , metadata [ ] prompb . MetricMetadata , pBuf * proto . Buffer , buf [ ] byte ) ( [ ] byte , int64 , error ) {
2018-09-07 21:26:04 +00:00
var highest int64
for _ , ts := range samples {
2021-05-06 20:53:52 +00:00
// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len ( ts . Samples ) > 0 && ts . Samples [ 0 ] . Timestamp > highest {
2018-09-07 21:26:04 +00:00
highest = ts . Samples [ 0 ] . Timestamp
}
2021-05-06 20:53:52 +00:00
if len ( ts . Exemplars ) > 0 && ts . Exemplars [ 0 ] . Timestamp > highest {
highest = ts . Exemplars [ 0 ] . Timestamp
}
2018-09-07 21:26:04 +00:00
}
2020-11-19 15:23:03 +00:00
2018-09-07 21:26:04 +00:00
req := & prompb . WriteRequest {
Timeseries : samples ,
2020-11-19 15:23:03 +00:00
Metadata : metadata ,
2018-09-07 21:26:04 +00:00
}
2021-10-29 21:44:40 +00:00
if pBuf == nil {
pBuf = proto . NewBuffer ( nil ) // For convenience in tests. Not efficient.
} else {
pBuf . Reset ( )
}
err := pBuf . Marshal ( req )
2018-09-07 21:26:04 +00:00
if err != nil {
return nil , highest , err
}
2017-05-10 09:44:13 +00:00
2019-06-27 18:48:21 +00:00
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
buf = buf [ 0 : cap ( buf ) ]
}
2021-10-29 21:44:40 +00:00
compressed := snappy . Encode ( buf , pBuf . Bytes ( ) )
2018-09-07 21:26:04 +00:00
return compressed , highest , nil
2017-05-10 09:44:13 +00:00
}