mirror of https://github.com/k3s-io/k3s
452 lines
11 KiB
Go
452 lines
11 KiB
Go
package sqllog
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/rancher/kine/pkg/broadcaster"
|
|
"github.com/rancher/kine/pkg/server"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type SQLLog struct {
|
|
d Dialect
|
|
broadcaster broadcaster.Broadcaster
|
|
ctx context.Context
|
|
notify chan int64
|
|
}
|
|
|
|
func New(d Dialect) *SQLLog {
|
|
l := &SQLLog{
|
|
d: d,
|
|
notify: make(chan int64, 1024),
|
|
}
|
|
return l
|
|
}
|
|
|
|
type Dialect interface {
|
|
ListCurrent(ctx context.Context, prefix string, limit int64, includeDeleted bool) (*sql.Rows, error)
|
|
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error)
|
|
Count(ctx context.Context, prefix string) (int64, int64, error)
|
|
CurrentRevision(ctx context.Context) (int64, error)
|
|
After(ctx context.Context, prefix string, rev int64) (*sql.Rows, error)
|
|
Insert(ctx context.Context, key string, create, delete bool, createRevision, previousRevision int64, ttl int64, value, prevValue []byte) (int64, error)
|
|
GetRevision(ctx context.Context, revision int64) (*sql.Rows, error)
|
|
DeleteRevision(ctx context.Context, revision int64) error
|
|
GetCompactRevision(ctx context.Context) (int64, error)
|
|
SetCompactRevision(ctx context.Context, revision int64) error
|
|
Fill(ctx context.Context, revision int64) error
|
|
IsFill(key string) bool
|
|
}
|
|
|
|
func (s *SQLLog) Start(ctx context.Context) (err error) {
|
|
s.ctx = ctx
|
|
return
|
|
}
|
|
|
|
func (s *SQLLog) compact() {
|
|
var (
|
|
nextEnd int64
|
|
)
|
|
t := time.NewTicker(5 * time.Minute)
|
|
nextEnd, _ = s.d.CurrentRevision(s.ctx)
|
|
|
|
outer:
|
|
for {
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return
|
|
case <-t.C:
|
|
}
|
|
|
|
currentRev, err := s.d.CurrentRevision(s.ctx)
|
|
if err != nil {
|
|
logrus.Errorf("failed to get current revision: %v", err)
|
|
continue
|
|
}
|
|
|
|
end := nextEnd
|
|
nextEnd = currentRev
|
|
|
|
cursor, err := s.d.GetCompactRevision(s.ctx)
|
|
if err != nil {
|
|
logrus.Errorf("failed to get compact revision: %v", err)
|
|
continue
|
|
}
|
|
|
|
// leave the last 1000
|
|
end = end - 1000
|
|
|
|
savedCursor := cursor
|
|
// Purposefully start at the current and redo the current as
|
|
// it could have failed before actually compacting
|
|
for ; cursor <= end; cursor++ {
|
|
rows, err := s.d.GetRevision(s.ctx, cursor)
|
|
if err != nil {
|
|
logrus.Errorf("failed to get revision %d: %v", cursor, err)
|
|
continue outer
|
|
}
|
|
|
|
_, _, events, err := RowsToEvents(rows)
|
|
if err != nil {
|
|
logrus.Errorf("failed to convert to events: %v", err)
|
|
continue outer
|
|
}
|
|
|
|
if len(events) == 0 {
|
|
continue
|
|
}
|
|
|
|
event := events[0]
|
|
|
|
if event.KV.Key == "compact_rev_key" {
|
|
// don't compact the compact key
|
|
continue
|
|
}
|
|
|
|
setRev := false
|
|
if event.PrevKV != nil && event.PrevKV.ModRevision != 0 {
|
|
if savedCursor != cursor {
|
|
if err := s.d.SetCompactRevision(s.ctx, cursor); err != nil {
|
|
logrus.Errorf("failed to record compact revision: %v", err)
|
|
continue outer
|
|
}
|
|
savedCursor = cursor
|
|
setRev = true
|
|
}
|
|
|
|
if err := s.d.DeleteRevision(s.ctx, event.PrevKV.ModRevision); err != nil {
|
|
logrus.Errorf("failed to delete revision %d: %v", event.PrevKV.ModRevision, err)
|
|
continue outer
|
|
}
|
|
}
|
|
|
|
if event.Delete {
|
|
if !setRev && savedCursor != cursor {
|
|
if err := s.d.SetCompactRevision(s.ctx, cursor); err != nil {
|
|
logrus.Errorf("failed to record compact revision: %v", err)
|
|
continue outer
|
|
}
|
|
savedCursor = cursor
|
|
}
|
|
|
|
if err := s.d.DeleteRevision(s.ctx, cursor); err != nil {
|
|
logrus.Errorf("failed to delete current revision %d: %v", cursor, err)
|
|
continue outer
|
|
}
|
|
}
|
|
}
|
|
|
|
if savedCursor != cursor {
|
|
if err := s.d.SetCompactRevision(s.ctx, cursor); err != nil {
|
|
logrus.Errorf("failed to record compact revision: %v", err)
|
|
continue outer
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) {
|
|
return s.d.CurrentRevision(ctx)
|
|
}
|
|
|
|
func (s *SQLLog) After(ctx context.Context, prefix string, revision int64) (int64, []*server.Event, error) {
|
|
if strings.HasSuffix(prefix, "/") {
|
|
prefix += "%"
|
|
}
|
|
|
|
rows, err := s.d.After(ctx, prefix, revision)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
|
|
rev, _, result, err := RowsToEvents(rows)
|
|
return rev, result, err
|
|
}
|
|
|
|
func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (int64, []*server.Event, error) {
|
|
var (
|
|
rows *sql.Rows
|
|
err error
|
|
)
|
|
|
|
if strings.HasSuffix(prefix, "/") {
|
|
prefix += "%"
|
|
}
|
|
|
|
if revision == 0 {
|
|
rows, err = s.d.ListCurrent(ctx, prefix, limit, includeDeleted)
|
|
} else {
|
|
rows, err = s.d.List(ctx, prefix, startKey, limit, revision, includeDeleted)
|
|
}
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
|
|
rev, compact, result, err := RowsToEvents(rows)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
|
|
if revision > 0 && len(result) == 0 {
|
|
// a zero length result won't have the compact revision so get it manually
|
|
compact, err = s.d.GetCompactRevision(ctx)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
}
|
|
|
|
if revision > 0 && revision < compact {
|
|
return rev, result, server.ErrCompacted
|
|
}
|
|
|
|
select {
|
|
case s.notify <- rev:
|
|
default:
|
|
}
|
|
|
|
return rev, result, err
|
|
}
|
|
|
|
func RowsToEvents(rows *sql.Rows) (int64, int64, []*server.Event, error) {
|
|
var (
|
|
result []*server.Event
|
|
rev int64
|
|
compact int64
|
|
)
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
event := &server.Event{}
|
|
if err := scan(rows, &rev, &compact, event); err != nil {
|
|
return 0, 0, nil, err
|
|
}
|
|
result = append(result, event)
|
|
}
|
|
|
|
return rev, compact, result, nil
|
|
}
|
|
|
|
func (s *SQLLog) Watch(ctx context.Context, prefix string) <-chan []*server.Event {
|
|
res := make(chan []*server.Event, 100)
|
|
values, err := s.broadcaster.Subscribe(ctx, s.startWatch)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
checkPrefix := strings.HasSuffix(prefix, "/")
|
|
|
|
go func() {
|
|
defer close(res)
|
|
for i := range values {
|
|
events, ok := filter(i, checkPrefix, prefix)
|
|
if ok {
|
|
res <- events
|
|
}
|
|
}
|
|
}()
|
|
|
|
return res
|
|
}
|
|
|
|
func filter(events interface{}, checkPrefix bool, prefix string) ([]*server.Event, bool) {
|
|
eventList := events.([]*server.Event)
|
|
filteredEventList := make([]*server.Event, 0, len(eventList))
|
|
|
|
for _, event := range eventList {
|
|
if (checkPrefix && strings.HasPrefix(event.KV.Key, prefix)) || event.KV.Key == prefix {
|
|
filteredEventList = append(filteredEventList, event)
|
|
}
|
|
}
|
|
|
|
return filteredEventList, len(filteredEventList) > 0
|
|
}
|
|
|
|
func (s *SQLLog) startWatch() (chan interface{}, error) {
|
|
pollStart, err := s.d.GetCompactRevision(s.ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c := make(chan interface{})
|
|
// start compaction and polling at the same time to watch starts
|
|
// at the oldest revision, but compaction doesn't create gaps
|
|
go s.compact()
|
|
go s.poll(c, pollStart)
|
|
return c, nil
|
|
}
|
|
|
|
func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
|
|
var (
|
|
last = pollStart
|
|
skip int64
|
|
skipTime time.Time
|
|
)
|
|
|
|
wait := time.NewTicker(time.Second)
|
|
defer wait.Stop()
|
|
defer close(result)
|
|
|
|
for {
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return
|
|
case check := <-s.notify:
|
|
if check <= last {
|
|
continue
|
|
}
|
|
case <-wait.C:
|
|
}
|
|
|
|
rows, err := s.d.After(s.ctx, "%", last)
|
|
if err != nil {
|
|
logrus.Errorf("fail to list latest changes: %v", err)
|
|
continue
|
|
}
|
|
|
|
_, _, events, err := RowsToEvents(rows)
|
|
if err != nil {
|
|
logrus.Errorf("fail to convert rows changes: %v", err)
|
|
continue
|
|
}
|
|
|
|
if len(events) == 0 {
|
|
continue
|
|
}
|
|
|
|
rev := last
|
|
var (
|
|
sequential []*server.Event
|
|
saveLast bool
|
|
)
|
|
|
|
for _, event := range events {
|
|
next := rev + 1
|
|
// Ensure that we are notifying events in a sequential fashion. For example if we find row 4 before 3
|
|
// we don't want to notify row 4 because 3 is essentially dropped forever.
|
|
if event.KV.ModRevision != next {
|
|
if canSkipRevision(next, skip, skipTime) {
|
|
// This situation should never happen, but we have it here as a fallback just for unknown reasons
|
|
// we don't want to pause all watches forever
|
|
logrus.Errorf("GAP %s, revision=%d, delete=%v, next=%d", event.KV.Key, event.KV.ModRevision, event.Delete, next)
|
|
} else if skip != next {
|
|
// This is the first time we have encountered this missing revision, so record time start
|
|
// and trigger a quick retry for simple out of order events
|
|
skip = next
|
|
skipTime = time.Now()
|
|
select {
|
|
case s.notify <- next:
|
|
default:
|
|
}
|
|
break
|
|
} else {
|
|
if err := s.d.Fill(s.ctx, next); err == nil {
|
|
logrus.Debugf("FILL, revision=%d, err=%v", next, err)
|
|
select {
|
|
case s.notify <- next:
|
|
default:
|
|
}
|
|
} else {
|
|
logrus.Debugf("FILL FAILED, revision=%d, err=%v", next, err)
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
// we have done something now that we should save the last revision. We don't save here now because
|
|
// the next loop could fail leading to saving the reported revision without reporting it. In practice this
|
|
// loop right now has no error exit so the next loop shouldn't fail, but if we for some reason add a method
|
|
// that returns error, that would be a tricky bug to find. So instead we only save the last revision at
|
|
// the same time we write to the channel.
|
|
saveLast = true
|
|
rev = event.KV.ModRevision
|
|
if s.d.IsFill(event.KV.Key) {
|
|
logrus.Debugf("NOT TRIGGER FILL %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete)
|
|
} else {
|
|
sequential = append(sequential, event)
|
|
logrus.Debugf("TRIGGERED %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete)
|
|
}
|
|
}
|
|
|
|
if saveLast {
|
|
last = rev
|
|
if len(sequential) > 0 {
|
|
result <- sequential
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func canSkipRevision(rev, skip int64, skipTime time.Time) bool {
|
|
return rev == skip && time.Now().Sub(skipTime) > time.Second
|
|
}
|
|
|
|
func (s *SQLLog) Count(ctx context.Context, prefix string) (int64, int64, error) {
|
|
if strings.HasSuffix(prefix, "/") {
|
|
prefix += "%"
|
|
}
|
|
return s.d.Count(ctx, prefix)
|
|
}
|
|
|
|
func (s *SQLLog) Append(ctx context.Context, event *server.Event) (int64, error) {
|
|
e := *event
|
|
if e.KV == nil {
|
|
e.KV = &server.KeyValue{}
|
|
}
|
|
if e.PrevKV == nil {
|
|
e.PrevKV = &server.KeyValue{}
|
|
}
|
|
|
|
rev, err := s.d.Insert(ctx, e.KV.Key,
|
|
e.Create,
|
|
e.Delete,
|
|
e.KV.CreateRevision,
|
|
e.PrevKV.ModRevision,
|
|
e.KV.Lease,
|
|
e.KV.Value,
|
|
e.PrevKV.Value,
|
|
)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
select {
|
|
case s.notify <- rev:
|
|
default:
|
|
}
|
|
return rev, nil
|
|
}
|
|
|
|
func scan(rows *sql.Rows, rev *int64, compact *int64, event *server.Event) error {
|
|
event.KV = &server.KeyValue{}
|
|
event.PrevKV = &server.KeyValue{}
|
|
|
|
c := &sql.NullInt64{}
|
|
|
|
err := rows.Scan(
|
|
rev,
|
|
c,
|
|
&event.KV.ModRevision,
|
|
&event.KV.Key,
|
|
&event.Create,
|
|
&event.Delete,
|
|
&event.KV.CreateRevision,
|
|
&event.PrevKV.ModRevision,
|
|
&event.KV.Lease,
|
|
&event.KV.Value,
|
|
&event.PrevKV.Value,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if event.Create {
|
|
event.KV.CreateRevision = event.KV.ModRevision
|
|
event.PrevKV = nil
|
|
}
|
|
|
|
*compact = c.Int64
|
|
return nil
|
|
}
|