|
|
|
@ -15,6 +15,7 @@ package remote
|
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"context" |
|
|
|
|
"errors" |
|
|
|
|
"math" |
|
|
|
|
"strconv" |
|
|
|
|
"sync" |
|
|
|
@ -1311,12 +1312,16 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry) |
|
|
|
|
if err != nil { |
|
|
|
|
if errors.Is(err, context.Canceled) { |
|
|
|
|
// When there is resharding, we cancel the context for this queue, which means the data is not sent.
|
|
|
|
|
// So we exit early to not update the metrics.
|
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
s.qm.metrics.sentBytesTotal.Add(float64(reqSize)) |
|
|
|
|
s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000)) |
|
|
|
|
return nil |
|
|
|
|
|
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error { |
|
|
|
|