|
|
|
@ -34,8 +34,7 @@ type TombstoneGC struct {
|
|
|
|
|
// expireCh is used to stream expiration to the leader for processing.
|
|
|
|
|
expireCh chan uint64
|
|
|
|
|
|
|
|
|
|
// lock is used to ensure safe access to all the fields.
|
|
|
|
|
lock sync.Mutex
|
|
|
|
|
sync.Mutex
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// expireInterval is used to track the maximum index to expire in a given
|
|
|
|
@ -77,8 +76,8 @@ func (t *TombstoneGC) ExpireCh() <-chan uint64 {
|
|
|
|
|
// SetEnabled is used to control if the tombstone GC is
|
|
|
|
|
// enabled. Should only be enabled by the leader node.
|
|
|
|
|
func (t *TombstoneGC) SetEnabled(enabled bool) {
|
|
|
|
|
t.lock.Lock()
|
|
|
|
|
defer t.lock.Unlock()
|
|
|
|
|
t.Lock()
|
|
|
|
|
defer t.Unlock()
|
|
|
|
|
if enabled == t.enabled {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
@ -100,8 +99,8 @@ func (t *TombstoneGC) SetEnabled(enabled bool) {
|
|
|
|
|
func (t *TombstoneGC) Hint(index uint64) {
|
|
|
|
|
expires := t.nextExpires()
|
|
|
|
|
|
|
|
|
|
t.lock.Lock()
|
|
|
|
|
defer t.lock.Unlock()
|
|
|
|
|
t.Lock()
|
|
|
|
|
defer t.Unlock()
|
|
|
|
|
if !t.enabled {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
@ -127,8 +126,8 @@ func (t *TombstoneGC) Hint(index uint64) {
|
|
|
|
|
|
|
|
|
|
// PendingExpiration is used to check if any expirations are pending.
|
|
|
|
|
func (t *TombstoneGC) PendingExpiration() bool {
|
|
|
|
|
t.lock.Lock()
|
|
|
|
|
defer t.lock.Unlock()
|
|
|
|
|
t.Lock()
|
|
|
|
|
defer t.Unlock()
|
|
|
|
|
|
|
|
|
|
return len(t.expires) > 0
|
|
|
|
|
}
|
|
|
|
@ -145,8 +144,8 @@ func (t *TombstoneGC) nextExpires() time.Time {
|
|
|
|
|
|
|
|
|
|
// expireTime is used to expire the entries at the given time.
|
|
|
|
|
func (t *TombstoneGC) expireTime(expires time.Time) {
|
|
|
|
|
t.lock.Lock()
|
|
|
|
|
defer t.lock.Unlock()
|
|
|
|
|
t.Lock()
|
|
|
|
|
defer t.Unlock()
|
|
|
|
|
|
|
|
|
|
// Get the maximum index and clear the entry. It's possible that the GC
|
|
|
|
|
// has been shut down while this timer fired and got blocked on the lock,
|
|
|
|
|