mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
200 lines
4.9 KiB
200 lines
4.9 KiB
// Copyright 2020 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package tsdb |
|
|
|
import ( |
|
"sync" |
|
) |
|
|
|
// isolationState holds the isolation information. |
|
type isolationState struct { |
|
// We will ignore all appends above the max, or that are incomplete. |
|
maxAppendID uint64 |
|
incompleteAppends map[uint64]struct{} |
|
lowWatermark uint64 // Lowest of incompleteAppends/maxAppendID. |
|
isolation *isolation |
|
|
|
// Doubly linked list of active reads. |
|
next *isolationState |
|
prev *isolationState |
|
} |
|
|
|
// Close closes the state. |
|
func (i *isolationState) Close() { |
|
i.isolation.readMtx.Lock() |
|
defer i.isolation.readMtx.Unlock() |
|
i.next.prev = i.prev |
|
i.prev.next = i.next |
|
} |
|
|
|
// isolation is the global isolation state. |
|
type isolation struct { |
|
// Mutex for accessing lastAppendID and appendsOpen. |
|
appendMtx sync.Mutex |
|
// Each append is given an internal id. |
|
lastAppendID uint64 |
|
// Which appends are currently in progress. |
|
appendsOpen map[uint64]struct{} |
|
// Mutex for accessing readsOpen. |
|
// If taking both appendMtx and readMtx, take appendMtx first. |
|
readMtx sync.Mutex |
|
// All current in use isolationStates. This is a doubly-linked list. |
|
readsOpen *isolationState |
|
} |
|
|
|
func newIsolation() *isolation { |
|
isoState := &isolationState{} |
|
isoState.next = isoState |
|
isoState.prev = isoState |
|
|
|
return &isolation{ |
|
appendsOpen: map[uint64]struct{}{}, |
|
readsOpen: isoState, |
|
} |
|
} |
|
|
|
// lowWatermark returns the appendID below which we no longer need to track |
|
// which appends were from which appendID. |
|
func (i *isolation) lowWatermark() uint64 { |
|
i.appendMtx.Lock() // Take appendMtx first. |
|
defer i.appendMtx.Unlock() |
|
i.readMtx.Lock() |
|
defer i.readMtx.Unlock() |
|
if i.readsOpen.prev != i.readsOpen { |
|
return i.readsOpen.prev.lowWatermark |
|
} |
|
lw := i.lastAppendID |
|
for k := range i.appendsOpen { |
|
if k < lw { |
|
lw = k |
|
} |
|
} |
|
return lw |
|
} |
|
|
|
// State returns an object used to control isolation |
|
// between a query and appends. Must be closed when complete. |
|
func (i *isolation) State() *isolationState { |
|
i.appendMtx.Lock() // Take append mutex before read mutex. |
|
defer i.appendMtx.Unlock() |
|
isoState := &isolationState{ |
|
maxAppendID: i.lastAppendID, |
|
lowWatermark: i.lastAppendID, |
|
incompleteAppends: make(map[uint64]struct{}, len(i.appendsOpen)), |
|
isolation: i, |
|
} |
|
for k := range i.appendsOpen { |
|
isoState.incompleteAppends[k] = struct{}{} |
|
if k < isoState.lowWatermark { |
|
isoState.lowWatermark = k |
|
} |
|
} |
|
|
|
i.readMtx.Lock() |
|
defer i.readMtx.Unlock() |
|
isoState.prev = i.readsOpen |
|
isoState.next = i.readsOpen.next |
|
i.readsOpen.next.prev = isoState |
|
i.readsOpen.next = isoState |
|
return isoState |
|
} |
|
|
|
// newAppendID increments the transaction counter and returns a new transaction |
|
// ID. The first ID returned is 1. |
|
func (i *isolation) newAppendID() uint64 { |
|
i.appendMtx.Lock() |
|
defer i.appendMtx.Unlock() |
|
i.lastAppendID++ |
|
i.appendsOpen[i.lastAppendID] = struct{}{} |
|
return i.lastAppendID |
|
} |
|
|
|
func (i *isolation) closeAppend(appendID uint64) { |
|
i.appendMtx.Lock() |
|
defer i.appendMtx.Unlock() |
|
delete(i.appendsOpen, appendID) |
|
} |
|
|
|
// The transactionID ring buffer. |
|
type txRing struct { |
|
txIDs []uint64 |
|
txIDFirst int // Position of the first id in the ring. |
|
txIDCount int // How many ids in the ring. |
|
} |
|
|
|
func newTxRing(cap int) *txRing { |
|
return &txRing{ |
|
txIDs: make([]uint64, cap), |
|
} |
|
} |
|
|
|
func (txr *txRing) add(appendID uint64) { |
|
if txr.txIDCount == len(txr.txIDs) { |
|
// Ring buffer is full, expand by doubling. |
|
newRing := make([]uint64, txr.txIDCount*2) |
|
idx := copy(newRing[:], txr.txIDs[txr.txIDFirst:]) |
|
copy(newRing[idx:], txr.txIDs[:txr.txIDFirst]) |
|
txr.txIDs = newRing |
|
txr.txIDFirst = 0 |
|
} |
|
|
|
txr.txIDs[(txr.txIDFirst+txr.txIDCount)%len(txr.txIDs)] = appendID |
|
txr.txIDCount++ |
|
} |
|
|
|
func (txr *txRing) cleanupAppendIDsBelow(bound uint64) { |
|
pos := txr.txIDFirst |
|
|
|
for txr.txIDCount > 0 { |
|
if txr.txIDs[pos] < bound { |
|
txr.txIDFirst++ |
|
txr.txIDCount-- |
|
} else { |
|
break |
|
} |
|
|
|
pos++ |
|
if pos == len(txr.txIDs) { |
|
pos = 0 |
|
} |
|
} |
|
|
|
txr.txIDFirst %= len(txr.txIDs) |
|
} |
|
|
|
func (txr *txRing) iterator() *txRingIterator { |
|
return &txRingIterator{ |
|
pos: txr.txIDFirst, |
|
ids: txr.txIDs, |
|
} |
|
} |
|
|
|
// txRingIterator lets you iterate over the ring. It doesn't terminate, |
|
// it DOESN'T terminate. |
|
type txRingIterator struct { |
|
ids []uint64 |
|
|
|
pos int |
|
} |
|
|
|
func (it *txRingIterator) At() uint64 { |
|
return it.ids[it.pos] |
|
} |
|
|
|
func (it *txRingIterator) Next() { |
|
it.pos++ |
|
if it.pos == len(it.ids) { |
|
it.pos = 0 |
|
} |
|
}
|
|
|