|
|
|
@ -431,7 +431,7 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
|
|
|
|
|
retry := func() { |
|
|
|
|
t.metrics.retriedMetadataTotal.Add(float64(len(metadata))) |
|
|
|
|
} |
|
|
|
|
err = sendWriteRequestWithBackoff(ctx, t.cfg, t.client(), t.logger, req, attemptStore, retry) |
|
|
|
|
err = sendWriteRequestWithBackoff(ctx, t.cfg, t.logger, attemptStore, retry) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
@ -1031,7 +1031,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
|
|
|
|
|
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.client(), s.qm.logger, req, attemptStore, onRetry) |
|
|
|
|
err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.logger, attemptStore, onRetry) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
@ -1040,7 +1040,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, s WriteClient, l log.Logger, req []byte, attempt func(int) error, onRetry func()) error { |
|
|
|
|
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l log.Logger, attempt func(int) error, onRetry func()) error { |
|
|
|
|
backoff := cfg.MinBackoff |
|
|
|
|
try := 0 |
|
|
|
|
|
|
|
|
|