2019-08-22 05:12:46 +00:00
package sqllog
import (
"context"
"database/sql"
"strings"
"time"
2020-11-30 23:45:22 +00:00
"github.com/k3s-io/kine/pkg/broadcaster"
"github.com/k3s-io/kine/pkg/server"
2020-10-28 07:38:43 +00:00
"github.com/pkg/errors"
2019-08-22 05:12:46 +00:00
"github.com/sirupsen/logrus"
)
2020-10-28 07:38:43 +00:00
const (
compactInterval = 5 * time . Minute
compactTimeout = 5 * time . Second
compactMinRetain = 1000
compactBatchSize = 1000
pollBatchSize = 500
)
2019-08-22 05:12:46 +00:00
type SQLLog struct {
2021-08-30 20:43:25 +00:00
d server . Dialect
2019-08-22 05:12:46 +00:00
broadcaster broadcaster . Broadcaster
ctx context . Context
notify chan int64
}
2021-08-30 20:43:25 +00:00
func New ( d server . Dialect ) * SQLLog {
2019-08-22 05:12:46 +00:00
l := & SQLLog {
d : d ,
notify : make ( chan int64 , 1024 ) ,
}
return l
}
2021-06-29 20:08:57 +00:00
func ( s * SQLLog ) Start ( ctx context . Context ) error {
2019-08-22 05:12:46 +00:00
s . ctx = ctx
2021-06-29 20:08:57 +00:00
return s . compactStart ( s . ctx )
2019-08-22 05:12:46 +00:00
}
2020-02-01 04:46:05 +00:00
func ( s * SQLLog ) compactStart ( ctx context . Context ) error {
2020-10-28 07:38:43 +00:00
logrus . Tracef ( "COMPACTSTART" )
2020-02-01 04:46:05 +00:00
rows , err := s . d . After ( ctx , "compact_rev_key" , 0 , 0 )
if err != nil {
return err
}
_ , _ , events , err := RowsToEvents ( rows )
if err != nil {
return err
}
2020-10-28 07:38:43 +00:00
logrus . Tracef ( "COMPACTSTART len(events)=%v" , len ( events ) )
2020-02-01 04:46:05 +00:00
if len ( events ) == 0 {
_ , err := s . Append ( ctx , & server . Event {
Create : true ,
KV : & server . KeyValue {
Key : "compact_rev_key" ,
Value : [ ] byte ( "" ) ,
} ,
} )
return err
} else if len ( events ) == 1 {
return nil
}
2020-10-28 07:38:43 +00:00
t , err := s . d . BeginTx ( ctx , & sql . TxOptions { Isolation : sql . LevelSerializable } )
if err != nil {
return err
}
defer t . MustRollback ( )
2020-02-01 04:46:05 +00:00
// this is to work around a bug in which we ended up with two compact_rev_key rows
maxRev := int64 ( 0 )
maxID := int64 ( 0 )
for _ , event := range events {
if event . PrevKV != nil && event . PrevKV . ModRevision > maxRev {
maxRev = event . PrevKV . ModRevision
maxID = event . KV . ModRevision
}
2020-10-28 07:38:43 +00:00
logrus . Tracef ( "COMPACTSTART maxRev=%v maxID=%v" , maxRev , maxID )
2020-02-01 04:46:05 +00:00
}
for _ , event := range events {
2020-10-28 07:38:43 +00:00
logrus . Tracef ( "COMPACTSTART event.KV.ModRevision=%v maxID=%v" , event . KV . ModRevision , maxID )
2020-02-01 04:46:05 +00:00
if event . KV . ModRevision == maxID {
continue
}
2020-10-28 07:38:43 +00:00
if err := t . DeleteRevision ( ctx , event . KV . ModRevision ) ; err != nil {
2020-02-01 04:46:05 +00:00
return err
}
}
2020-10-28 07:38:43 +00:00
return t . Commit ( )
2020-02-01 04:46:05 +00:00
}
2020-10-28 07:38:43 +00:00
// compactor periodically compacts historical versions of keys.
// It will compact keys with versions older than given interval, but never within the last 1000 revisions.
// In other words, after compaction, it will only contain key revisions set during last interval.
// Any API call for the older versions of keys will return error.
// Interval is the time interval between each compaction. The first compaction happens after "interval".
// This logic is directly cribbed from k8s.io/apiserver/pkg/storage/etcd3/compact.go
func ( s * SQLLog ) compactor ( interval time . Duration ) {
t := time . NewTicker ( interval )
compactRev , _ := s . d . GetCompactRevision ( s . ctx )
targetCompactRev , _ := s . d . CurrentRevision ( s . ctx )
logrus . Tracef ( "COMPACT starting compactRev=%d targetCompactRev=%d" , compactRev , targetCompactRev )
2019-08-22 05:12:46 +00:00
outer :
for {
select {
case <- s . ctx . Done ( ) :
return
case <- t . C :
}
2020-10-28 07:38:43 +00:00
// Break up the compaction into smaller batches to avoid locking the database with excessively
// long transactions. When things are working normally deletes should proceed quite quickly, but if
// run against a database where compaction has stalled (see rancher/k3s#1311) it may take a long time
// (several hundred ms) just for the database to execute the subquery to select the revisions to delete.
2019-11-02 00:05:00 +00:00
2020-10-28 07:38:43 +00:00
var (
iterCompactRev int64
compactedRev int64
currentRev int64
err error
)
2019-08-22 05:12:46 +00:00
2020-10-28 07:38:43 +00:00
iterCompactRev = compactRev
compactedRev = compactRev
2019-08-22 05:12:46 +00:00
2020-10-28 07:38:43 +00:00
for iterCompactRev < targetCompactRev {
// Set move iteration target compactBatchSize revisions forward, or
// just as far as we need to hit the compaction target if that would
// overshoot it.
iterCompactRev += compactBatchSize
if iterCompactRev > targetCompactRev {
iterCompactRev = targetCompactRev
2019-08-22 05:12:46 +00:00
}
2020-10-28 07:38:43 +00:00
compactedRev , currentRev , err = s . compact ( compactedRev , iterCompactRev )
2019-08-22 05:12:46 +00:00
if err != nil {
2020-10-28 07:38:43 +00:00
// ErrCompacted indicates that no further work is necessary - either compactRev changed since the
// last iteration because another client has compacted, or the requested revision has already been compacted.
if err == server . ErrCompacted {
break
} else {
logrus . Errorf ( "Compact failed: %v" , err )
continue outer
}
2019-08-22 05:12:46 +00:00
}
2020-10-28 07:38:43 +00:00
}
2019-08-22 05:12:46 +00:00
2020-10-28 07:38:43 +00:00
// Record the final results for the outer loop
compactRev = compactedRev
targetCompactRev = currentRev
}
}
2019-08-22 05:12:46 +00:00
2020-10-28 07:38:43 +00:00
// compact removes deleted or replaced rows from the database. compactRev is the revision that was last compacted to.
// If this changes between compactions, we know that someone else has compacted and we don't need to do it.
// targetCompactRev is the revision that we should try to compact to. Upon success, the function returns the revision
// compacted to, and the revision that we should try to compact to next time (the current revision).
// This logic is directly cribbed from k8s.io/apiserver/pkg/storage/etcd3/compact.go
func ( s * SQLLog ) compact ( compactRev int64 , targetCompactRev int64 ) ( int64 , int64 , error ) {
ctx , cancel := context . WithTimeout ( s . ctx , compactTimeout )
defer cancel ( )
2019-08-22 05:12:46 +00:00
2020-10-28 07:38:43 +00:00
t , err := s . d . BeginTx ( ctx , & sql . TxOptions { Isolation : sql . LevelSerializable } )
if err != nil {
return compactRev , targetCompactRev , errors . Wrap ( err , "failed to begin transaction" )
}
defer t . MustRollback ( )
2019-08-22 05:12:46 +00:00
2020-10-28 07:38:43 +00:00
currentRev , err := t . CurrentRevision ( s . ctx )
if err != nil {
return compactRev , targetCompactRev , errors . Wrap ( err , "failed to get current revision" )
}
2019-08-22 05:12:46 +00:00
2020-10-28 07:38:43 +00:00
dbCompactRev , err := t . GetCompactRevision ( s . ctx )
if err != nil {
return compactRev , targetCompactRev , errors . Wrap ( err , "failed to get compact revision" )
}
2019-08-22 05:12:46 +00:00
2020-10-28 07:38:43 +00:00
if compactRev != dbCompactRev {
logrus . Tracef ( "COMPACT compact revision changed since last iteration: %d => %d" , compactRev , dbCompactRev )
return dbCompactRev , currentRev , server . ErrCompacted
}
2019-08-22 05:12:46 +00:00
2020-10-28 07:38:43 +00:00
// Ensure that we never compact the most recent 1000 revisions
targetCompactRev = safeCompactRev ( targetCompactRev , currentRev )
2019-08-22 05:12:46 +00:00
2020-10-28 07:38:43 +00:00
// Don't bother compacting to a revision that has already been compacted
if targetCompactRev <= compactRev {
logrus . Tracef ( "COMPACT revision %d has already been compacted" , targetCompactRev )
return dbCompactRev , currentRev , server . ErrCompacted
}
logrus . Tracef ( "COMPACT compactRev=%d targetCompactRev=%d currentRev=%d" , compactRev , targetCompactRev , currentRev )
start := time . Now ( )
deletedRows , err := t . Compact ( s . ctx , targetCompactRev )
if err != nil {
return compactRev , targetCompactRev , errors . Wrapf ( err , "failed to compact to revision %d" , targetCompactRev )
2019-08-22 05:12:46 +00:00
}
2020-10-28 07:38:43 +00:00
if err := t . SetCompactRevision ( s . ctx , targetCompactRev ) ; err != nil {
return compactRev , targetCompactRev , errors . Wrap ( err , "failed to record compact revision" )
}
t . MustCommit ( )
logrus . Debugf ( "COMPACT deleted %d rows from %d revisions in %s - compacted to %d/%d" , deletedRows , ( targetCompactRev - compactRev ) , time . Since ( start ) , targetCompactRev , currentRev )
return targetCompactRev , currentRev , nil
2019-08-22 05:12:46 +00:00
}
func ( s * SQLLog ) CurrentRevision ( ctx context . Context ) ( int64 , error ) {
return s . d . CurrentRevision ( ctx )
}
2019-11-14 00:45:05 +00:00
func ( s * SQLLog ) After ( ctx context . Context , prefix string , revision , limit int64 ) ( int64 , [ ] * server . Event , error ) {
2019-08-22 05:12:46 +00:00
if strings . HasSuffix ( prefix , "/" ) {
prefix += "%"
}
2019-11-14 00:45:05 +00:00
rows , err := s . d . After ( ctx , prefix , revision , limit )
2019-08-22 05:12:46 +00:00
if err != nil {
return 0 , nil , err
}
2020-01-20 22:43:58 +00:00
rev , compact , result , err := RowsToEvents ( rows )
if revision > 0 && revision < compact {
return rev , result , server . ErrCompacted
}
2019-08-22 05:12:46 +00:00
return rev , result , err
}
func ( s * SQLLog ) List ( ctx context . Context , prefix , startKey string , limit , revision int64 , includeDeleted bool ) ( int64 , [ ] * server . Event , error ) {
var (
rows * sql . Rows
err error
)
2019-12-17 05:20:00 +00:00
// It's assumed that when there is a start key that that key exists.
2019-08-22 05:12:46 +00:00
if strings . HasSuffix ( prefix , "/" ) {
2019-12-17 05:20:00 +00:00
// In the situation of a list start the startKey will not exist so set to ""
if prefix == startKey {
startKey = ""
}
2019-08-22 05:12:46 +00:00
prefix += "%"
2019-12-17 05:20:00 +00:00
} else {
// Also if this isn't a list there is no reason to pass startKey
startKey = ""
2019-08-22 05:12:46 +00:00
}
if revision == 0 {
rows , err = s . d . ListCurrent ( ctx , prefix , limit , includeDeleted )
} else {
rows , err = s . d . List ( ctx , prefix , startKey , limit , revision , includeDeleted )
}
if err != nil {
return 0 , nil , err
}
rev , compact , result , err := RowsToEvents ( rows )
if err != nil {
return 0 , nil , err
}
if revision > 0 && len ( result ) == 0 {
// a zero length result won't have the compact revision so get it manually
compact , err = s . d . GetCompactRevision ( ctx )
if err != nil {
return 0 , nil , err
}
}
if revision > 0 && revision < compact {
return rev , result , server . ErrCompacted
}
select {
case s . notify <- rev :
default :
}
return rev , result , err
}
func RowsToEvents ( rows * sql . Rows ) ( int64 , int64 , [ ] * server . Event , error ) {
var (
result [ ] * server . Event
rev int64
compact int64
)
defer rows . Close ( )
for rows . Next ( ) {
event := & server . Event { }
if err := scan ( rows , & rev , & compact , event ) ; err != nil {
return 0 , 0 , nil , err
}
result = append ( result , event )
}
return rev , compact , result , nil
}
func ( s * SQLLog ) Watch ( ctx context . Context , prefix string ) <- chan [ ] * server . Event {
2019-11-02 00:05:00 +00:00
res := make ( chan [ ] * server . Event , 100 )
2019-08-22 05:12:46 +00:00
values , err := s . broadcaster . Subscribe ( ctx , s . startWatch )
if err != nil {
return nil
}
checkPrefix := strings . HasSuffix ( prefix , "/" )
go func ( ) {
defer close ( res )
for i := range values {
events , ok := filter ( i , checkPrefix , prefix )
if ok {
res <- events
}
}
} ( )
return res
}
func filter ( events interface { } , checkPrefix bool , prefix string ) ( [ ] * server . Event , bool ) {
eventList := events . ( [ ] * server . Event )
filteredEventList := make ( [ ] * server . Event , 0 , len ( eventList ) )
for _ , event := range eventList {
if ( checkPrefix && strings . HasPrefix ( event . KV . Key , prefix ) ) || event . KV . Key == prefix {
filteredEventList = append ( filteredEventList , event )
}
}
return filteredEventList , len ( filteredEventList ) > 0
}
func ( s * SQLLog ) startWatch ( ) ( chan interface { } , error ) {
2019-11-08 21:45:10 +00:00
pollStart , err := s . d . GetCompactRevision ( s . ctx )
if err != nil {
return nil , err
}
2019-08-22 05:12:46 +00:00
c := make ( chan interface { } )
2019-11-08 21:45:10 +00:00
// start compaction and polling at the same time to watch starts
// at the oldest revision, but compaction doesn't create gaps
2020-10-28 07:38:43 +00:00
go s . compactor ( compactInterval )
2019-11-08 21:45:10 +00:00
go s . poll ( c , pollStart )
2019-08-22 05:12:46 +00:00
return c , nil
}
2019-11-08 21:45:10 +00:00
func ( s * SQLLog ) poll ( result chan interface { } , pollStart int64 ) {
2019-08-22 05:12:46 +00:00
var (
2019-11-14 00:45:05 +00:00
last = pollStart
skip int64
skipTime time . Time
waitForMore = true
2019-08-22 05:12:46 +00:00
)
2019-11-02 00:05:00 +00:00
wait := time . NewTicker ( time . Second )
2019-08-22 05:12:46 +00:00
defer wait . Stop ( )
defer close ( result )
for {
2019-11-14 00:45:05 +00:00
if waitForMore {
select {
case <- s . ctx . Done ( ) :
return
case check := <- s . notify :
if check <= last {
continue
}
case <- wait . C :
2019-08-22 05:12:46 +00:00
}
}
2019-11-14 00:45:05 +00:00
waitForMore = true
2019-08-22 05:12:46 +00:00
2020-10-28 07:38:43 +00:00
rows , err := s . d . After ( s . ctx , "%" , last , pollBatchSize )
2019-08-22 05:12:46 +00:00
if err != nil {
logrus . Errorf ( "fail to list latest changes: %v" , err )
continue
}
2019-11-02 00:05:00 +00:00
_ , _ , events , err := RowsToEvents ( rows )
2019-08-22 05:12:46 +00:00
if err != nil {
logrus . Errorf ( "fail to convert rows changes: %v" , err )
continue
}
if len ( events ) == 0 {
continue
}
2019-11-14 00:45:05 +00:00
waitForMore = len ( events ) < 100
2019-11-02 00:05:00 +00:00
rev := last
var (
sequential [ ] * server . Event
saveLast bool
)
2019-08-22 05:12:46 +00:00
for _ , event := range events {
2019-11-02 00:05:00 +00:00
next := rev + 1
// Ensure that we are notifying events in a sequential fashion. For example if we find row 4 before 3
// we don't want to notify row 4 because 3 is essentially dropped forever.
if event . KV . ModRevision != next {
2020-10-28 07:38:43 +00:00
logrus . Tracef ( "MODREVISION GAP: expected %v, got %v" , next , event . KV . ModRevision )
2019-11-02 00:05:00 +00:00
if canSkipRevision ( next , skip , skipTime ) {
// This situation should never happen, but we have it here as a fallback just for unknown reasons
// we don't want to pause all watches forever
logrus . Errorf ( "GAP %s, revision=%d, delete=%v, next=%d" , event . KV . Key , event . KV . ModRevision , event . Delete , next )
} else if skip != next {
// This is the first time we have encountered this missing revision, so record time start
// and trigger a quick retry for simple out of order events
skip = next
skipTime = time . Now ( )
select {
case s . notify <- next :
default :
}
break
} else {
if err := s . d . Fill ( s . ctx , next ) ; err == nil {
2020-10-15 17:34:24 +00:00
logrus . Tracef ( "FILL, revision=%d, err=%v" , next , err )
2019-11-02 00:05:00 +00:00
select {
case s . notify <- next :
default :
}
} else {
2020-10-15 17:34:24 +00:00
logrus . Tracef ( "FILL FAILED, revision=%d, err=%v" , next , err )
2019-11-02 00:05:00 +00:00
}
break
}
}
// we have done something now that we should save the last revision. We don't save here now because
// the next loop could fail leading to saving the reported revision without reporting it. In practice this
// loop right now has no error exit so the next loop shouldn't fail, but if we for some reason add a method
// that returns error, that would be a tricky bug to find. So instead we only save the last revision at
// the same time we write to the channel.
saveLast = true
rev = event . KV . ModRevision
if s . d . IsFill ( event . KV . Key ) {
2020-10-15 17:34:24 +00:00
logrus . Tracef ( "NOT TRIGGER FILL %s, revision=%d, delete=%v" , event . KV . Key , event . KV . ModRevision , event . Delete )
2019-11-02 00:05:00 +00:00
} else {
sequential = append ( sequential , event )
2020-10-15 17:34:24 +00:00
logrus . Tracef ( "TRIGGERED %s, revision=%d, delete=%v" , event . KV . Key , event . KV . ModRevision , event . Delete )
2019-11-02 00:05:00 +00:00
}
2019-08-22 05:12:46 +00:00
}
2019-11-02 00:05:00 +00:00
if saveLast {
last = rev
if len ( sequential ) > 0 {
result <- sequential
}
}
2019-08-22 05:12:46 +00:00
}
}
2019-11-02 00:05:00 +00:00
func canSkipRevision ( rev , skip int64 , skipTime time . Time ) bool {
2020-10-28 07:38:43 +00:00
return rev == skip && time . Since ( skipTime ) > time . Second
2019-11-02 00:05:00 +00:00
}
2019-08-22 05:12:46 +00:00
func ( s * SQLLog ) Count ( ctx context . Context , prefix string ) ( int64 , int64 , error ) {
if strings . HasSuffix ( prefix , "/" ) {
prefix += "%"
}
return s . d . Count ( ctx , prefix )
}
func ( s * SQLLog ) Append ( ctx context . Context , event * server . Event ) ( int64 , error ) {
e := * event
if e . KV == nil {
e . KV = & server . KeyValue { }
}
if e . PrevKV == nil {
e . PrevKV = & server . KeyValue { }
}
rev , err := s . d . Insert ( ctx , e . KV . Key ,
e . Create ,
e . Delete ,
e . KV . CreateRevision ,
e . PrevKV . ModRevision ,
e . KV . Lease ,
e . KV . Value ,
e . PrevKV . Value ,
)
if err != nil {
return 0 , err
}
select {
case s . notify <- rev :
default :
}
return rev , nil
}
func scan ( rows * sql . Rows , rev * int64 , compact * int64 , event * server . Event ) error {
event . KV = & server . KeyValue { }
event . PrevKV = & server . KeyValue { }
c := & sql . NullInt64 { }
err := rows . Scan (
rev ,
c ,
& event . KV . ModRevision ,
& event . KV . Key ,
& event . Create ,
& event . Delete ,
& event . KV . CreateRevision ,
& event . PrevKV . ModRevision ,
& event . KV . Lease ,
& event . KV . Value ,
& event . PrevKV . Value ,
)
if err != nil {
return err
}
if event . Create {
event . KV . CreateRevision = event . KV . ModRevision
event . PrevKV = nil
}
* compact = c . Int64
return nil
}
2020-10-28 07:38:43 +00:00
// safeCompactRev ensures that we never compact the most recent 1000 revisions.
func safeCompactRev ( targetCompactRev int64 , currentRev int64 ) int64 {
safeRev := currentRev - compactMinRetain
if targetCompactRev < safeRev {
safeRev = targetCompactRev
}
if safeRev < 0 {
safeRev = 0
}
return safeRev
}
2021-08-30 20:43:25 +00:00
func ( s * SQLLog ) DbSize ( ctx context . Context ) ( int64 , error ) {
return s . d . GetSize ( ctx )
}