mirror of https://github.com/prometheus/prometheus
storage: Replace usage of sync/atomic with uber-go/atomic
Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>pull/7683/head
parent
348ff4285f
commit
bf6b063bfb
|
@ -15,15 +15,14 @@ package remote
|
|||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
// ewmaRate tracks an exponentially weighted moving average of a per-second rate.
|
||||
type ewmaRate struct {
|
||||
// Keep all 64bit atomically accessed variables at the top of this struct.
|
||||
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more info.
|
||||
newEvents int64
|
||||
newEvents atomic.Int64
|
||||
|
||||
alpha float64
|
||||
interval time.Duration
|
||||
|
@ -50,7 +49,7 @@ func (r *ewmaRate) rate() float64 {
|
|||
|
||||
// tick assumes to be called every r.interval.
|
||||
func (r *ewmaRate) tick() {
|
||||
newEvents := atomic.SwapInt64(&r.newEvents, 0)
|
||||
newEvents := r.newEvents.Swap(0)
|
||||
instantRate := float64(newEvents) / r.interval.Seconds()
|
||||
|
||||
r.mutex.Lock()
|
||||
|
@ -66,5 +65,5 @@ func (r *ewmaRate) tick() {
|
|||
|
||||
// inc counts one event.
|
||||
func (r *ewmaRate) incr(incr int64) {
|
||||
atomic.AddInt64(&r.newEvents, incr)
|
||||
r.newEvents.Add(incr)
|
||||
}
|
||||
|
|
|
@ -20,10 +20,10 @@ package remote
|
|||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
var interner = newPool()
|
||||
|
@ -40,13 +40,17 @@ type pool struct {
|
|||
}
|
||||
|
||||
type entry struct {
|
||||
// Keep all 64bit atomically accessed variables at the top of this struct.
|
||||
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for more info.
|
||||
refs int64
|
||||
refs atomic.Int64
|
||||
|
||||
s string
|
||||
}
|
||||
|
||||
func newEntry(s string) *entry {
|
||||
entry := &entry{s: s}
|
||||
entry.refs.Store(1)
|
||||
return entry
|
||||
}
|
||||
|
||||
func newPool() *pool {
|
||||
return &pool{
|
||||
pool: map[string]*entry{},
|
||||
|
@ -62,20 +66,17 @@ func (p *pool) intern(s string) string {
|
|||
interned, ok := p.pool[s]
|
||||
p.mtx.RUnlock()
|
||||
if ok {
|
||||
atomic.AddInt64(&interned.refs, 1)
|
||||
interned.refs.Inc()
|
||||
return interned.s
|
||||
}
|
||||
p.mtx.Lock()
|
||||
defer p.mtx.Unlock()
|
||||
if interned, ok := p.pool[s]; ok {
|
||||
atomic.AddInt64(&interned.refs, 1)
|
||||
interned.refs.Inc()
|
||||
return interned.s
|
||||
}
|
||||
|
||||
p.pool[s] = &entry{
|
||||
s: s,
|
||||
refs: 1,
|
||||
}
|
||||
p.pool[s] = newEntry(s)
|
||||
return s
|
||||
}
|
||||
|
||||
|
@ -89,14 +90,14 @@ func (p *pool) release(s string) {
|
|||
return
|
||||
}
|
||||
|
||||
refs := atomic.AddInt64(&interned.refs, -1)
|
||||
refs := interned.refs.Dec()
|
||||
if refs > 0 {
|
||||
return
|
||||
}
|
||||
|
||||
p.mtx.Lock()
|
||||
defer p.mtx.Unlock()
|
||||
if atomic.LoadInt64(&interned.refs) != 0 {
|
||||
if interned.refs.Load() != 0 {
|
||||
return
|
||||
}
|
||||
delete(p.pool, s)
|
||||
|
|
|
@ -20,7 +20,6 @@ package remote
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -33,7 +32,7 @@ func TestIntern(t *testing.T) {
|
|||
interned, ok := interner.pool[testString]
|
||||
|
||||
testutil.Equals(t, true, ok)
|
||||
testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
||||
testutil.Assert(t, interned.refs.Load() == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
||||
}
|
||||
|
||||
func TestIntern_MultiRef(t *testing.T) {
|
||||
|
@ -43,13 +42,13 @@ func TestIntern_MultiRef(t *testing.T) {
|
|||
interned, ok := interner.pool[testString]
|
||||
|
||||
testutil.Equals(t, true, ok)
|
||||
testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
||||
testutil.Assert(t, interned.refs.Load() == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
||||
|
||||
interner.intern(testString)
|
||||
interned, ok = interner.pool[testString]
|
||||
|
||||
testutil.Equals(t, true, ok)
|
||||
testutil.Assert(t, interned.refs == 2, fmt.Sprintf("expected refs to be 2 but it was %d", interned.refs))
|
||||
testutil.Assert(t, interned.refs.Load() == 2, fmt.Sprintf("expected refs to be 2 but it was %d", interned.refs))
|
||||
}
|
||||
|
||||
func TestIntern_DeleteRef(t *testing.T) {
|
||||
|
@ -59,7 +58,7 @@ func TestIntern_DeleteRef(t *testing.T) {
|
|||
interned, ok := interner.pool[testString]
|
||||
|
||||
testutil.Equals(t, true, ok)
|
||||
testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
||||
testutil.Assert(t, interned.refs.Load() == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
||||
|
||||
interner.release(testString)
|
||||
_, ok = interner.pool[testString]
|
||||
|
@ -72,7 +71,7 @@ func TestIntern_MultiRef_Concurrent(t *testing.T) {
|
|||
interner.intern(testString)
|
||||
interned, ok := interner.pool[testString]
|
||||
testutil.Equals(t, true, ok)
|
||||
testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
||||
testutil.Assert(t, interned.refs.Load() == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
||||
|
||||
go interner.release(testString)
|
||||
|
||||
|
@ -84,5 +83,5 @@ func TestIntern_MultiRef_Concurrent(t *testing.T) {
|
|||
interned, ok = interner.pool[testString]
|
||||
interner.mtx.RUnlock()
|
||||
testutil.Equals(t, true, ok)
|
||||
testutil.Assert(t, atomic.LoadInt64(&interned.refs) == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
||||
testutil.Assert(t, interned.refs.Load() == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs))
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ import (
|
|||
"math"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
|
@ -35,6 +34,8 @@ import (
|
|||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/wal"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -235,8 +236,7 @@ type WriteClient interface {
|
|||
// indicated by the provided WriteClient. Implements writeTo interface
|
||||
// used by WAL Watcher.
|
||||
type QueueManager struct {
|
||||
// https://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
||||
lastSendTimestamp int64
|
||||
lastSendTimestamp atomic.Int64
|
||||
|
||||
logger log.Logger
|
||||
flushDeadline time.Duration
|
||||
|
@ -537,7 +537,7 @@ func (t *QueueManager) shouldReshard(desiredShards int) bool {
|
|||
// We shouldn't reshard if Prometheus hasn't been able to send to the
|
||||
// remote endpoint successfully within some period of time.
|
||||
minSendTimestamp := time.Now().Add(-2 * time.Duration(t.cfg.BatchSendDeadline)).Unix()
|
||||
lsts := atomic.LoadInt64(&t.lastSendTimestamp)
|
||||
lsts := t.lastSendTimestamp.Load()
|
||||
if lsts < minSendTimestamp {
|
||||
level.Warn(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold", "lastSendTimestamp", lsts, "minSendTimestamp", minSendTimestamp)
|
||||
return false
|
||||
|
@ -663,7 +663,7 @@ type shards struct {
|
|||
// Emulate a wait group with a channel and an atomic int, as you
|
||||
// cannot select on a wait group.
|
||||
done chan struct{}
|
||||
running int32
|
||||
running atomic.Int32
|
||||
|
||||
// Soft shutdown context will prevent new enqueues and deadlocks.
|
||||
softShutdown chan struct{}
|
||||
|
@ -671,7 +671,7 @@ type shards struct {
|
|||
// Hard shutdown context is used to terminate outgoing HTTP connections
|
||||
// after giving them a chance to terminate.
|
||||
hardShutdown context.CancelFunc
|
||||
droppedOnHardShutdown uint32
|
||||
droppedOnHardShutdown atomic.Uint32
|
||||
}
|
||||
|
||||
// start the shards; must be called before any call to enqueue.
|
||||
|
@ -692,9 +692,9 @@ func (s *shards) start(n int) {
|
|||
var hardShutdownCtx context.Context
|
||||
hardShutdownCtx, s.hardShutdown = context.WithCancel(context.Background())
|
||||
s.softShutdown = make(chan struct{})
|
||||
s.running = int32(n)
|
||||
s.running.Store(int32(n))
|
||||
s.done = make(chan struct{})
|
||||
atomic.StoreUint32(&s.droppedOnHardShutdown, 0)
|
||||
s.droppedOnHardShutdown.Store(0)
|
||||
for i := 0; i < n; i++ {
|
||||
go s.runShard(hardShutdownCtx, i, newQueues[i])
|
||||
}
|
||||
|
@ -727,7 +727,7 @@ func (s *shards) stop() {
|
|||
// Force an unclean shutdown.
|
||||
s.hardShutdown()
|
||||
<-s.done
|
||||
if dropped := atomic.LoadUint32(&s.droppedOnHardShutdown); dropped > 0 {
|
||||
if dropped := s.droppedOnHardShutdown.Load(); dropped > 0 {
|
||||
level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown", "count", dropped)
|
||||
}
|
||||
}
|
||||
|
@ -756,7 +756,7 @@ func (s *shards) enqueue(ref uint64, sample sample) bool {
|
|||
|
||||
func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
|
||||
defer func() {
|
||||
if atomic.AddInt32(&s.running, -1) == 0 {
|
||||
if s.running.Dec() == 0 {
|
||||
close(s.done)
|
||||
}
|
||||
}()
|
||||
|
@ -792,7 +792,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan sample) {
|
|||
droppedSamples := nPending + len(queue)
|
||||
s.qm.metrics.pendingSamples.Sub(float64(droppedSamples))
|
||||
s.qm.metrics.failedSamplesTotal.Add(float64(droppedSamples))
|
||||
atomic.AddUint32(&s.droppedOnHardShutdown, uint32(droppedSamples))
|
||||
s.droppedOnHardShutdown.Add(uint32(droppedSamples))
|
||||
return
|
||||
|
||||
case sample, ok := <-queue:
|
||||
|
@ -847,7 +847,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, b
|
|||
// should be maintained irrespective of success or failure.
|
||||
s.qm.samplesOut.incr(int64(len(samples)))
|
||||
s.qm.samplesOutDuration.incr(int64(time.Since(begin)))
|
||||
atomic.StoreInt64(&s.qm.lastSendTimestamp, time.Now().Unix())
|
||||
s.qm.lastSendTimestamp.Store(time.Now().Unix())
|
||||
}
|
||||
|
||||
// sendSamples to the remote storage with backoff for recoverable errors.
|
||||
|
|
|
@ -24,7 +24,6 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -41,6 +40,7 @@ import (
|
|||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
const defaultFlushDeadline = 1 * time.Minute
|
||||
|
@ -336,7 +336,7 @@ func TestShouldReshard(t *testing.T) {
|
|||
m.numShards = c.startingShards
|
||||
m.samplesIn.incr(c.samplesIn)
|
||||
m.samplesOut.incr(c.samplesOut)
|
||||
m.lastSendTimestamp = c.lastSendTimestamp
|
||||
m.lastSendTimestamp.Store(c.lastSendTimestamp)
|
||||
|
||||
m.Start()
|
||||
|
||||
|
@ -497,7 +497,7 @@ func (c *TestWriteClient) Endpoint() string {
|
|||
// point the `numCalls` property will contain a count of how many times Store()
|
||||
// was called.
|
||||
type TestBlockingWriteClient struct {
|
||||
numCalls uint64
|
||||
numCalls atomic.Uint64
|
||||
}
|
||||
|
||||
func NewTestBlockedWriteClient() *TestBlockingWriteClient {
|
||||
|
@ -505,13 +505,13 @@ func NewTestBlockedWriteClient() *TestBlockingWriteClient {
|
|||
}
|
||||
|
||||
func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte) error {
|
||||
atomic.AddUint64(&c.numCalls, 1)
|
||||
c.numCalls.Inc()
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TestBlockingWriteClient) NumCalls() uint64 {
|
||||
return atomic.LoadUint64(&c.numCalls)
|
||||
return c.numCalls.Load()
|
||||
}
|
||||
|
||||
func (c *TestBlockingWriteClient) Name() string {
|
||||
|
@ -667,7 +667,7 @@ func TestCalculateDesiredShards(t *testing.T) {
|
|||
highestSent := startedAt.Add(ts - time.Duration(pendingSamples/inputRate)*time.Second)
|
||||
m.metrics.highestSentTimestamp.Set(float64(highestSent.Unix()))
|
||||
|
||||
atomic.StoreInt64(&m.lastSendTimestamp, time.Now().Unix())
|
||||
m.lastSendTimestamp.Store(time.Now().Unix())
|
||||
}
|
||||
|
||||
ts := time.Duration(0)
|
||||
|
|
Loading…
Reference in New Issue