Merge pull request #2396 from briandowns/issue-831

Update kine to v0.4.1
pull/2383/head
Brian Downs 2020-10-15 13:39:08 -07:00 committed by GitHub
commit 0063646628
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 76 additions and 50 deletions

2
go.mod
View File

@ -88,7 +88,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/rancher/dynamiclistener v0.2.1
github.com/rancher/helm-controller v0.7.3
github.com/rancher/kine v0.4.0
github.com/rancher/kine v0.4.1
github.com/rancher/remotedialer v0.2.0
github.com/rancher/wrangler v0.6.1
github.com/rancher/wrangler-api v0.6.0

4
go.sum
View File

@ -677,8 +677,8 @@ github.com/rancher/go-powershell v0.0.0-20200701184732-233247d45373 h1:BePi97poJ
github.com/rancher/go-powershell v0.0.0-20200701184732-233247d45373/go.mod h1:Vz8oLnHgttpo/aZrTpjbcpZEDzzElqNau2zmorToY0E=
github.com/rancher/helm-controller v0.7.3 h1:WTQHcNF2vl9w6Xd1eBtXDe0JUsYMFFstqX9ghGhI5Ac=
github.com/rancher/helm-controller v0.7.3/go.mod h1:ZylsxIMGNADRPRNW+NiBWhrwwks9vnKLQiCHYWb6Bi0=
github.com/rancher/kine v0.4.0 h1:1IhWy3TzjExG8xnj46eyUEWdzqNAD1WrgL4eEBKm6Uc=
github.com/rancher/kine v0.4.0/go.mod h1:IImtCJ68AIkE+VY/kUI0NkyJL5q5WzO8QvMsSXqbrpA=
github.com/rancher/kine v0.4.1 h1:CPtGDXsov5t5onXwhZ97VBpaxDoj1MBHeQwB0TSrUu8=
github.com/rancher/kine v0.4.1/go.mod h1:IImtCJ68AIkE+VY/kUI0NkyJL5q5WzO8QvMsSXqbrpA=
github.com/rancher/kubernetes v1.19.2-k3s1 h1:/oTv57BwDcf8kapnr1ViYH98Fwk3vnklWmQdlI3vJE0=
github.com/rancher/kubernetes v1.19.2-k3s1/go.mod h1:yhT1/ltQajQsha3tnYc9QPFYSumGM45nlZdjf7WqE1A=
github.com/rancher/kubernetes/staging/src/k8s.io/api v1.19.2-k3s1 h1:OPBCfsjKfgMaMt0mtWaoy+IirLeD+/CeVxoHXdP5bTE=

View File

@ -15,6 +15,7 @@ import (
"github.com/canonical/go-dqlite/client"
"github.com/canonical/go-dqlite/driver"
"github.com/pkg/errors"
"github.com/rancher/kine/pkg/drivers/generic"
"github.com/rancher/kine/pkg/drivers/sqlite"
"github.com/rancher/kine/pkg/server"
"github.com/sirupsen/logrus"
@ -66,7 +67,7 @@ outer:
return nil
}
func New(ctx context.Context, datasourceName string) (server.Backend, error) {
func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
opts, err := parseOpts(datasourceName)
if err != nil {
return nil, err
@ -95,7 +96,7 @@ func New(ctx context.Context, datasourceName string) (server.Backend, error) {
}
sql.Register("dqlite", d)
backend, generic, err := sqlite.NewVariant(ctx, "dqlite", opts.dsn)
backend, generic, err := sqlite.NewVariant(ctx, "dqlite", opts.dsn, connPoolConfig)
if err != nil {
return nil, errors.Wrap(err, "sqlite client")
}

View File

@ -4,11 +4,12 @@ package dqlite
import (
"context"
"fmt"
"errors"
"github.com/rancher/kine/pkg/drivers/generic"
"github.com/rancher/kine/pkg/server"
)
func New(ctx context.Context, datasourceName string) (server.Backend, error) {
return nil, fmt.Errorf("dqlite is not support, compile with \"-tags dqlite\"")
func New(ctx context.Context, datasourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
return nil, errors.New(`this binary is built without dqlite support, compile with "-tags dqlite"`)
}

View File

@ -15,6 +15,10 @@ import (
"github.com/sirupsen/logrus"
)
const (
defaultMaxIdleConns = 2 // copied from database/sql
)
var (
columns = "kv.id as theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value"
revSQL = `
@ -64,6 +68,12 @@ func (s Stripped) String() string {
type ErrRetry func(error) bool
type TranslateErr func(error) error
type ConnectionPoolConfig struct {
MaxIdle int // zero means defaultMaxIdleConns; negative means 0
MaxOpen int // <= 0 means unlimited
MaxLifetime time.Duration // maximum amount of time a connection may be reused
}
type Generic struct {
sync.Mutex
@ -128,6 +138,20 @@ func (d *Generic) Migrate(ctx context.Context) {
}
}
func configureConnectionPooling(connPoolConfig ConnectionPoolConfig, db *sql.DB) {
// behavior copied from database/sql - zero means defaultMaxIdleConns; negative means 0
if connPoolConfig.MaxIdle < 0 {
connPoolConfig.MaxIdle = 0
} else if connPoolConfig.MaxIdle == 0 {
connPoolConfig.MaxIdle = defaultMaxIdleConns
}
logrus.Infof("Configuring DB connection pooling: maxIdleConns=%d, maxOpenConns=%d, connMaxLifetime=%s", connPoolConfig.MaxIdle, connPoolConfig.MaxOpen, connPoolConfig.MaxLifetime)
db.SetMaxIdleConns(connPoolConfig.MaxIdle)
db.SetMaxOpenConns(connPoolConfig.MaxOpen)
db.SetConnMaxLifetime(connPoolConfig.MaxLifetime)
}
func openAndTest(driverName, dataSourceName string) (*sql.DB, error) {
db, err := sql.Open(driverName, dataSourceName)
if err != nil {
@ -144,7 +168,7 @@ func openAndTest(driverName, dataSourceName string) (*sql.DB, error) {
return db, nil
}
func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter string, numbered bool) (*Generic, error) {
func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig ConnectionPoolConfig, paramCharacter string, numbered bool) (*Generic, error) {
var (
db *sql.DB
err error
@ -164,6 +188,8 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter
}
}
configureConnectionPooling(connPoolConfig, db)
return &Generic{
DB: db,
@ -229,11 +255,7 @@ func (d *Generic) execute(ctx context.Context, sql string, args ...interface{})
wait := strategy.Backoff(backoff.Linear(100 + time.Millisecond))
for i := uint(0); i < 20; i++ {
if i > 2 {
logrus.Debugf("EXEC (try: %d) %v : %s", i, args, Stripped(sql))
} else {
logrus.Tracef("EXEC (try: %d) %v : %s", i, args, Stripped(sql))
}
logrus.Tracef("EXEC (try: %d) %v : %s", i, args, Stripped(sql))
result, err = d.DB.ExecContext(ctx, sql, args...)
if err != nil && d.Retry != nil && d.Retry(err) {
wait(i)

View File

@ -40,7 +40,7 @@ var (
createDB = "create database if not exists "
)
func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config) (server.Backend, error) {
func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
@ -59,7 +59,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config) (server
return nil, err
}
dialect, err := generic.Open(ctx, "mysql", parsedDSN, "?", false)
dialect, err := generic.Open(ctx, "mysql", parsedDSN, connPoolConfig, "?", false)
if err != nil {
return nil, err
}

View File

@ -41,7 +41,7 @@ var (
createDB = "create database "
)
func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config) (server.Backend, error) {
func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
parsedDSN, err := prepareDSN(dataSourceName, tlsInfo)
if err != nil {
return nil, err
@ -51,7 +51,7 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config) (server
return nil, err
}
dialect, err := generic.Open(ctx, "postgres", parsedDSN, "$", true)
dialect, err := generic.Open(ctx, "postgres", parsedDSN, connPoolConfig, "$", true)
if err != nil {
return nil, err
}

View File

@ -39,12 +39,12 @@ var (
}
)
func New(ctx context.Context, dataSourceName string) (server.Backend, error) {
backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName)
func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
backend, _, err := NewVariant(ctx, "sqlite3", dataSourceName, connPoolConfig)
return backend, err
}
func NewVariant(ctx context.Context, driverName, dataSourceName string) (server.Backend, *generic.Generic, error) {
func NewVariant(ctx context.Context, driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, *generic.Generic, error) {
if dataSourceName == "" {
if err := os.MkdirAll("./db", 0700); err != nil {
return nil, nil, err
@ -52,7 +52,7 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string) (server.
dataSourceName = "./db/state.db?_journal=WAL&cache=shared"
}
dialect, err := generic.Open(ctx, driverName, dataSourceName, "?", false)
dialect, err := generic.Open(ctx, driverName, dataSourceName, connPoolConfig, "?", false)
if err != nil {
return nil, nil, err
}

View File

@ -13,11 +13,11 @@ import (
var errNoCgo = errors.New("this binary is built without CGO, sqlite is disabled")
func New(ctx context.Context, dataSourceName string) (server.Backend, error) {
func New(ctx context.Context, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
return nil, errNoCgo
}
func NewVariant(driverName, dataSourceName string) (server.Backend, *generic.Generic, error) {
func NewVariant(driverName, dataSourceName string, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, *generic.Generic, error) {
return nil, nil, errNoCgo
}

View File

@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"
"github.com/rancher/kine/pkg/drivers/dqlite"
"github.com/rancher/kine/pkg/drivers/generic"
"github.com/rancher/kine/pkg/drivers/mysql"
"github.com/rancher/kine/pkg/drivers/pgsql"
"github.com/rancher/kine/pkg/drivers/sqlite"
@ -28,9 +29,10 @@ const (
)
type Config struct {
GRPCServer *grpc.Server
Listener string
Endpoint string
GRPCServer *grpc.Server
Listener string
Endpoint string
ConnectionPoolConfig generic.ConnectionPoolConfig
tls.Config
}
@ -124,13 +126,13 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config)
switch driver {
case SQLiteBackend:
leaderElect = false
backend, err = sqlite.New(ctx, dsn)
backend, err = sqlite.New(ctx, dsn, cfg.ConnectionPoolConfig)
case DQLiteBackend:
backend, err = dqlite.New(ctx, dsn)
backend, err = dqlite.New(ctx, dsn, cfg.ConnectionPoolConfig)
case PostgresBackend:
backend, err = pgsql.New(ctx, dsn, cfg.Config)
backend, err = pgsql.New(ctx, dsn, cfg.Config, cfg.ConnectionPoolConfig)
case MySQLBackend:
backend, err = mysql.New(ctx, dsn, cfg.Config)
backend, err = mysql.New(ctx, dsn, cfg.Config, cfg.ConnectionPoolConfig)
default:
return false, nil, fmt.Errorf("storage backend is not defined")
}

View File

@ -41,7 +41,7 @@ func (l *LogStructured) Start(ctx context.Context) error {
func (l *LogStructured) Get(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, errRet error) {
defer func() {
l.adjustRevision(ctx, &revRet)
logrus.Debugf("GET %s, rev=%d => rev=%d, kv=%v, err=%v", key, revision, revRet, kvRet != nil, errRet)
logrus.Tracef("GET %s, rev=%d => rev=%d, kv=%v, err=%v", key, revision, revRet, kvRet != nil, errRet)
}()
rev, event, err := l.get(ctx, key, revision, false)
@ -82,7 +82,7 @@ func (l *LogStructured) adjustRevision(ctx context.Context, rev *int64) {
func (l *LogStructured) Create(ctx context.Context, key string, value []byte, lease int64) (revRet int64, errRet error) {
defer func() {
l.adjustRevision(ctx, &revRet)
logrus.Debugf("CREATE %s, size=%d, lease=%d => rev=%d, err=%v", key, len(value), lease, revRet, errRet)
logrus.Tracef("CREATE %s, size=%d, lease=%d => rev=%d, err=%v", key, len(value), lease, revRet, errRet)
}()
rev, prevEvent, err := l.get(ctx, key, 0, true)
@ -114,7 +114,7 @@ func (l *LogStructured) Create(ctx context.Context, key string, value []byte, le
func (l *LogStructured) Delete(ctx context.Context, key string, revision int64) (revRet int64, kvRet *server.KeyValue, deletedRet bool, errRet error) {
defer func() {
l.adjustRevision(ctx, &revRet)
logrus.Debugf("DELETE %s, rev=%d => rev=%d, kv=%v, deleted=%v, err=%v", key, revision, revRet, kvRet != nil, deletedRet, errRet)
logrus.Tracef("DELETE %s, rev=%d => rev=%d, kv=%v, deleted=%v, err=%v", key, revision, revRet, kvRet != nil, deletedRet, errRet)
}()
rev, event, err := l.get(ctx, key, 0, true)
@ -155,7 +155,7 @@ func (l *LogStructured) Delete(ctx context.Context, key string, revision int64)
func (l *LogStructured) List(ctx context.Context, prefix, startKey string, limit, revision int64) (revRet int64, kvRet []*server.KeyValue, errRet error) {
defer func() {
logrus.Debugf("LIST %s, start=%s, limit=%d, rev=%d => rev=%d, kvs=%d, err=%v", prefix, startKey, limit, revision, revRet, len(kvRet), errRet)
logrus.Tracef("LIST %s, start=%s, limit=%d, rev=%d => rev=%d, kvs=%d, err=%v", prefix, startKey, limit, revision, revRet, len(kvRet), errRet)
}()
rev, events, err := l.log.List(ctx, prefix, startKey, limit, revision, false)
@ -185,7 +185,7 @@ func (l *LogStructured) List(ctx context.Context, prefix, startKey string, limit
func (l *LogStructured) Count(ctx context.Context, prefix string) (revRet int64, count int64, err error) {
defer func() {
logrus.Debugf("COUNT %s => rev=%d, count=%d, err=%v", prefix, revRet, count, err)
logrus.Tracef("COUNT %s => rev=%d, count=%d, err=%v", prefix, revRet, count, err)
}()
rev, count, err := l.log.Count(ctx, prefix)
if err != nil {
@ -211,7 +211,7 @@ func (l *LogStructured) Update(ctx context.Context, key string, value []byte, re
if kvRet != nil {
kvRev = kvRet.ModRevision
}
logrus.Debugf("UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, kvrev=%d, updated=%v, err=%v", key, len(value), revision, lease, revRet, kvRev, updateRet, errRet)
logrus.Tracef("UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, kvrev=%d, updated=%v, err=%v", key, len(value), revision, lease, revRet, kvRev, updateRet, errRet)
}()
rev, event, err := l.get(ctx, key, 0, false)
@ -311,7 +311,7 @@ func (l *LogStructured) ttl(ctx context.Context) {
}
func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64) <-chan []*server.Event {
logrus.Debugf("WATCH %s, revision=%d", prefix, revision)
logrus.Tracef("WATCH %s, revision=%d", prefix, revision)
// starting watching right away so we don't miss anything
ctx, cancel := context.WithCancel(ctx)
@ -319,7 +319,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64
// include the current revision in list
if revision > 0 {
revision -= 1
revision--
}
result := make(chan []*server.Event, 100)
@ -330,7 +330,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64
cancel()
}
logrus.Debugf("WATCH LIST key=%s rev=%d => rev=%d kvs=%d", prefix, revision, rev, len(kvs))
logrus.Tracef("WATCH LIST key=%s rev=%d => rev=%d kvs=%d", prefix, revision, rev, len(kvs))
go func() {
lastRevision := revision

View File

@ -411,13 +411,13 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
break
} else {
if err := s.d.Fill(s.ctx, next); err == nil {
logrus.Debugf("FILL, revision=%d, err=%v", next, err)
logrus.Tracef("FILL, revision=%d, err=%v", next, err)
select {
case s.notify <- next:
default:
}
} else {
logrus.Debugf("FILL FAILED, revision=%d, err=%v", next, err)
logrus.Tracef("FILL FAILED, revision=%d, err=%v", next, err)
}
break
}
@ -431,10 +431,10 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
saveLast = true
rev = event.KV.ModRevision
if s.d.IsFill(event.KV.Key) {
logrus.Debugf("NOT TRIGGER FILL %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete)
logrus.Tracef("NOT TRIGGER FILL %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete)
} else {
sequential = append(sequential, event)
logrus.Debugf("TRIGGERED %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete)
logrus.Tracef("TRIGGERED %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete)
}
}

View File

@ -31,7 +31,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {
if msg.GetCreateRequest() != nil {
w.Start(ws.Context(), msg.GetCreateRequest())
} else if msg.GetCancelRequest() != nil {
logrus.Debugf("WATCH CANCEL REQ id=%d", msg.GetCancelRequest().GetWatchId())
logrus.Tracef("WATCH CANCEL REQ id=%d", msg.GetCancelRequest().GetWatchId())
w.Cancel(msg.GetCancelRequest().WatchId, nil)
}
}
@ -58,7 +58,7 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest)
key := string(r.Key)
logrus.Debugf("WATCH START id=%d, count=%d, key=%s, revision=%d", id, len(w.watches), key, r.StartRevision)
logrus.Tracef("WATCH START id=%d, count=%d, key=%s, revision=%d", id, len(w.watches), key, r.StartRevision)
go func() {
defer w.wg.Done()
@ -78,7 +78,7 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest)
if logrus.IsLevelEnabled(logrus.DebugLevel) {
for _, event := range events {
logrus.Debugf("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision)
logrus.Tracef("WATCH READ id=%d, key=%s, revision=%d", id, event.KV.Key, event.KV.ModRevision)
}
}
@ -92,7 +92,7 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest)
}
}
w.Cancel(id, nil)
logrus.Debugf("WATCH CLOSE id=%d, key=%s", id, key)
logrus.Tracef("WATCH CLOSE id=%d, key=%s", id, key)
}()
}
@ -130,7 +130,7 @@ func (w *watcher) Cancel(watchID int64, err error) {
if err != nil {
reason = err.Error()
}
logrus.Debugf("WATCH CANCEL id=%d reason=%s", watchID, reason)
logrus.Tracef("WATCH CANCEL id=%d reason=%s", watchID, reason)
serr := w.server.Send(&etcdserverpb.WatchResponse{
Header: &etcdserverpb.ResponseHeader{},
Canceled: true,

2
vendor/modules.txt vendored
View File

@ -836,7 +836,7 @@ github.com/rancher/helm-controller/pkg/generated/informers/externalversions/helm
github.com/rancher/helm-controller/pkg/generated/informers/externalversions/internalinterfaces
github.com/rancher/helm-controller/pkg/generated/listers/helm.cattle.io/v1
github.com/rancher/helm-controller/pkg/helm
# github.com/rancher/kine v0.4.0
# github.com/rancher/kine v0.4.1
## explicit
github.com/rancher/kine/pkg/broadcaster
github.com/rancher/kine/pkg/client