Update kine to v0.5.0

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/2457/head
Brad Davidson 2020-10-28 00:38:43 -07:00 committed by Brad Davidson
parent 68339ae00c
commit 8cdaf52980
45 changed files with 23263 additions and 16901 deletions

6
go.mod
View File

@ -79,8 +79,8 @@ require (
github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.1
github.com/kubernetes-sigs/cri-tools v0.0.0-00010101000000-000000000000
github.com/lib/pq v1.1.1
github.com/mattn/go-sqlite3 v1.13.0
github.com/lib/pq v1.8.0
github.com/mattn/go-sqlite3 v1.14.4
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/opencontainers/runc v1.0.0-rc92
github.com/opencontainers/selinux v1.6.0
@ -88,7 +88,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/rancher/dynamiclistener v0.2.1
github.com/rancher/helm-controller v0.7.3
github.com/rancher/kine v0.4.1
github.com/rancher/kine v0.5.0
github.com/rancher/remotedialer v0.2.0
github.com/rancher/wrangler v0.6.1
github.com/rancher/wrangler-api v0.6.0

13
go.sum
View File

@ -498,8 +498,8 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/lib/pq v1.1.1 h1:sJZmqHoEaY7f+NPP8pgLB/WxulyR3fewgCM2qaSlBb4=
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg=
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/libopenstorage/openstorage v1.0.0/go.mod h1:Sp1sIObHjat1BeXhfMqLZ14wnOzEhNx2YQedreMcUyc=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
@ -525,9 +525,8 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o=
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.13.0 h1:LnJI81JidiW9r7pS/hXe6cFeO5EXNq7KbfvoJLRI69c=
github.com/mattn/go-sqlite3 v1.13.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.14.4 h1:4rQjbDxdu9fSgI/r3KN72G3c2goxknAqHHgPWWs8UlI=
github.com/mattn/go-sqlite3 v1.14.4/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
@ -677,8 +676,8 @@ github.com/rancher/go-powershell v0.0.0-20200701184732-233247d45373 h1:BePi97poJ
github.com/rancher/go-powershell v0.0.0-20200701184732-233247d45373/go.mod h1:Vz8oLnHgttpo/aZrTpjbcpZEDzzElqNau2zmorToY0E=
github.com/rancher/helm-controller v0.7.3 h1:WTQHcNF2vl9w6Xd1eBtXDe0JUsYMFFstqX9ghGhI5Ac=
github.com/rancher/helm-controller v0.7.3/go.mod h1:ZylsxIMGNADRPRNW+NiBWhrwwks9vnKLQiCHYWb6Bi0=
github.com/rancher/kine v0.4.1 h1:CPtGDXsov5t5onXwhZ97VBpaxDoj1MBHeQwB0TSrUu8=
github.com/rancher/kine v0.4.1/go.mod h1:IImtCJ68AIkE+VY/kUI0NkyJL5q5WzO8QvMsSXqbrpA=
github.com/rancher/kine v0.5.0 h1:ot9ZInMCb0482aWfvO+3gI2B+e9vGxpY12EDJgpowiY=
github.com/rancher/kine v0.5.0/go.mod h1:NoqDMfN0Q+Wu23Kk3MfXfgLO2fE6abLaetejZs9HAYo=
github.com/rancher/kubernetes v1.19.3-k3s1 h1:Tfr1qShnWaNGx4kyBSW5A9rvISgHjEg0KRvvZIV5Zpc=
github.com/rancher/kubernetes v1.19.3-k3s1/go.mod h1:yhT1/ltQajQsha3tnYc9QPFYSumGM45nlZdjf7WqE1A=
github.com/rancher/kubernetes/staging/src/k8s.io/api v1.19.3-k3s1 h1:+C1BPPjbCfFFcStBNUJ1gqIDYxdkvbKuZXm3CTQXFxY=

13
vendor/github.com/lib/pq/.travis.sh generated vendored
View File

@ -70,17 +70,4 @@ postgresql_uninstall() {
sudo rm -rf /var/lib/postgresql
}
megacheck_install() {
# Lock megacheck version at $MEGACHECK_VERSION to prevent spontaneous
# new error messages in old code.
go get -d honnef.co/go/tools/...
git -C $GOPATH/src/honnef.co/go/tools/ checkout $MEGACHECK_VERSION
go install honnef.co/go/tools/cmd/megacheck
megacheck --version
}
golint_install() {
go get golang.org/x/lint/golint
}
$1

16
vendor/github.com/lib/pq/.travis.yml generated vendored
View File

@ -1,9 +1,8 @@
language: go
go:
- 1.9.x
- 1.10.x
- 1.11.x
- 1.13.x
- 1.14.x
- master
sudo: true
@ -14,16 +13,11 @@ env:
- PQGOSSLTESTS=1
- PQSSLCERTTEST_PATH=$PWD/certs
- PGHOST=127.0.0.1
- MEGACHECK_VERSION=2017.2.2
matrix:
- PGVERSION=10
- PGVERSION=9.6
- PGVERSION=9.5
- PGVERSION=9.4
- PGVERSION=9.3
- PGVERSION=9.2
- PGVERSION=9.1
- PGVERSION=9.0
before_install:
- ./.travis.sh postgresql_uninstall
@ -31,9 +25,9 @@ before_install:
- ./.travis.sh postgresql_install
- ./.travis.sh postgresql_configure
- ./.travis.sh client_configure
- ./.travis.sh megacheck_install
- ./.travis.sh golint_install
- go get golang.org/x/tools/cmd/goimports
- go get golang.org/x/lint/golint
- GO111MODULE=on go get honnef.co/go/tools/cmd/staticcheck@2020.1.3
before_script:
- createdb pqgotest
@ -44,7 +38,7 @@ script:
- >
goimports -d -e $(find -name '*.go') | awk '{ print } END { exit NR == 0 ? 0 : 1 }'
- go vet ./...
- megacheck -go 1.9 ./...
- staticcheck -go 1.13 ./...
- golint ./...
- PQTEST_BINARY_PARAMETERS=no go test -race -v ./...
- PQTEST_BINARY_PARAMETERS=yes go test -race -v ./...

View File

@ -1,29 +0,0 @@
## Contributing to pq
`pq` has a backlog of pull requests, but contributions are still very
much welcome. You can help with patch review, submitting bug reports,
or adding new functionality. There is no formal style guide, but
please conform to the style of existing code and general Go formatting
conventions when submitting patches.
### Patch review
Help review existing open pull requests by commenting on the code or
proposed functionality.
### Bug reports
We appreciate any bug reports, but especially ones with self-contained
(doesn't depend on code outside of pq), minimal (can't be simplified
further) test cases. It's especially helpful if you can submit a pull
request with just the failing test case (you'll probably want to
pattern it after the tests in
[conn_test.go](https://github.com/lib/pq/blob/master/conn_test.go).
### New functionality
There are a number of pending patches for new functionality, so
additional feature patches will take a while to merge. Still, patches
are generally reviewed based on usefulness and complexity in addition
to time-in-queue, so if you have a knockout idea, take a shot. Feel
free to open an issue discussion your proposed patch beforehand.

77
vendor/github.com/lib/pq/README.md generated vendored
View File

@ -1,21 +1,11 @@
# pq - A pure Go postgres driver for Go's database/sql package
[![GoDoc](https://godoc.org/github.com/lib/pq?status.svg)](https://godoc.org/github.com/lib/pq)
[![Build Status](https://travis-ci.org/lib/pq.svg?branch=master)](https://travis-ci.org/lib/pq)
[![GoDoc](https://godoc.org/github.com/lib/pq?status.svg)](https://pkg.go.dev/github.com/lib/pq?tab=doc)
## Install
go get github.com/lib/pq
## Docs
For detailed documentation and basic usage examples, please see the package
documentation at <https://godoc.org/github.com/lib/pq>.
## Tests
`go test` is used for testing. See [TESTS.md](TESTS.md) for more details.
## Features
* SSL
@ -29,67 +19,12 @@ documentation at <https://godoc.org/github.com/lib/pq>.
* Unix socket support
* Notifications: `LISTEN`/`NOTIFY`
* pgpass support
* GSS (Kerberos) auth
## Future / Things you can help with
## Tests
* Better COPY FROM / COPY TO (see discussion in #181)
`go test` is used for testing. See [TESTS.md](TESTS.md) for more details.
## Thank you (alphabetical)
## Status
Some of these contributors are from the original library `bmizerany/pq.go` whose
code still exists in here.
* Andy Balholm (andybalholm)
* Ben Berkert (benburkert)
* Benjamin Heatwole (bheatwole)
* Bill Mill (llimllib)
* Bjørn Madsen (aeons)
* Blake Gentry (bgentry)
* Brad Fitzpatrick (bradfitz)
* Charlie Melbye (cmelbye)
* Chris Bandy (cbandy)
* Chris Gilling (cgilling)
* Chris Walsh (cwds)
* Dan Sosedoff (sosedoff)
* Daniel Farina (fdr)
* Eric Chlebek (echlebek)
* Eric Garrido (minusnine)
* Eric Urban (hydrogen18)
* Everyone at The Go Team
* Evan Shaw (edsrzf)
* Ewan Chou (coocood)
* Fazal Majid (fazalmajid)
* Federico Romero (federomero)
* Fumin (fumin)
* Gary Burd (garyburd)
* Heroku (heroku)
* James Pozdena (jpoz)
* Jason McVetta (jmcvetta)
* Jeremy Jay (pbnjay)
* Joakim Sernbrant (serbaut)
* John Gallagher (jgallagher)
* Jonathan Rudenberg (titanous)
* Joël Stemmer (jstemmer)
* Kamil Kisiel (kisielk)
* Kelly Dunn (kellydunn)
* Keith Rarick (kr)
* Kir Shatrov (kirs)
* Lann Martin (lann)
* Maciek Sakrejda (uhoh-itsmaciek)
* Marc Brinkmann (mbr)
* Marko Tiikkaja (johto)
* Matt Newberry (MattNewberry)
* Matt Robenolt (mattrobenolt)
* Martin Olsen (martinolsen)
* Mike Lewis (mikelikespie)
* Nicolas Patry (Narsil)
* Oliver Tonnhofer (olt)
* Patrick Hayes (phayes)
* Paul Hammond (paulhammond)
* Ryan Smith (ryandotsmith)
* Samuel Stauffer (samuel)
* Timothée Peignier (cyberdelia)
* Travis Cline (tmc)
* TruongSinh Tran-Nguyen (truongsinh)
* Yaismel Miranda (ympons)
* notedit (notedit)
This package is effectively in maintenance mode and is not actively developed. Small patches and features are only rarely reviewed and merged. We recommend using [pgx](https://github.com/jackc/pgx) which is actively maintained.

2
vendor/github.com/lib/pq/buf.go generated vendored
View File

@ -66,7 +66,7 @@ func (b *writeBuf) int16(n int) {
}
func (b *writeBuf) string(s string) {
b.buf = append(b.buf, (s + "\000")...)
b.buf = append(append(b.buf, s...), '\000')
}
func (b *writeBuf) byte(c byte) {

131
vendor/github.com/lib/pq/conn.go generated vendored
View File

@ -92,6 +92,7 @@ type Dialer interface {
DialTimeout(network, address string, timeout time.Duration) (net.Conn, error)
}
// DialerContext is the context-aware dialer interface.
type DialerContext interface {
DialContext(ctx context.Context, network, address string) (net.Conn, error)
}
@ -148,6 +149,15 @@ type conn struct {
// If true this connection is in the middle of a COPY
inCopy bool
// If not nil, notices will be synchronously sent here
noticeHandler func(*Error)
// If not nil, notifications will be synchronously sent here
notificationHandler func(*Notification)
// GSSAPI context
gss GSS
}
// Handle driver-side settings in parsed connection string.
@ -301,6 +311,9 @@ func (c *Connector) open(ctx context.Context) (cn *conn, err error) {
err = cn.ssl(o)
if err != nil {
if cn.c != nil {
cn.c.Close()
}
return nil, err
}
@ -325,10 +338,6 @@ func (c *Connector) open(ctx context.Context) (cn *conn, err error) {
func dial(ctx context.Context, d Dialer, o values) (net.Conn, error) {
network, address := network(o)
// SSL is not necessary or supported over UNIX domain sockets
if network == "unix" {
o["sslmode"] = "disable"
}
// Zero or not specified means wait indefinitely.
if timeout, ok := o["connect_timeout"]; ok && timeout != "0" {
@ -546,7 +555,7 @@ func (cn *conn) Commit() (err error) {
// would get the same behaviour if you issued a COMMIT in a failed
// transaction, so it's also the least surprising thing to do here.
if cn.txnStatus == txnStatusInFailedTransaction {
if err := cn.Rollback(); err != nil {
if err := cn.rollback(); err != nil {
return err
}
return ErrInFailedTransaction
@ -573,7 +582,10 @@ func (cn *conn) Rollback() (err error) {
return driver.ErrBadConn
}
defer cn.errRecover(&err)
return cn.rollback()
}
func (cn *conn) rollback() (err error) {
cn.checkIsInTransaction(true)
_, commandTag, err := cn.simpleExec("ROLLBACK")
if err != nil {
@ -964,7 +976,13 @@ func (cn *conn) recv() (t byte, r *readBuf) {
case 'E':
panic(parseError(r))
case 'N':
// ignore
if n := cn.noticeHandler; n != nil {
n(parseError(r))
}
case 'A':
if n := cn.notificationHandler; n != nil {
n(recvNotification(r))
}
default:
return
}
@ -981,8 +999,14 @@ func (cn *conn) recv1Buf(r *readBuf) byte {
}
switch t {
case 'A', 'N':
// ignore
case 'A':
if n := cn.notificationHandler; n != nil {
n(recvNotification(r))
}
case 'N':
if n := cn.noticeHandler; n != nil {
n(parseError(r))
}
case 'S':
cn.processParameterStatus(r)
default:
@ -1050,7 +1074,10 @@ func isDriverSetting(key string) bool {
return true
case "binary_parameters":
return true
case "krbsrvname":
return true
case "krbspn":
return true
default:
return false
}
@ -1130,6 +1157,59 @@ func (cn *conn) auth(r *readBuf, o values) {
if r.int32() != 0 {
errorf("unexpected authentication response: %q", t)
}
case 7: // GSSAPI, startup
if newGss == nil {
errorf("kerberos error: no GSSAPI provider registered (import github.com/lib/pq/auth/kerberos if you need Kerberos support)")
}
cli, err := newGss()
if err != nil {
errorf("kerberos error: %s", err.Error())
}
var token []byte
if spn, ok := o["krbspn"]; ok {
// Use the supplied SPN if provided..
token, err = cli.GetInitTokenFromSpn(spn)
} else {
// Allow the kerberos service name to be overridden
service := "postgres"
if val, ok := o["krbsrvname"]; ok {
service = val
}
token, err = cli.GetInitToken(o["host"], service)
}
if err != nil {
errorf("failed to get Kerberos ticket: %q", err)
}
w := cn.writeBuf('p')
w.bytes(token)
cn.send(w)
// Store for GSSAPI continue message
cn.gss = cli
case 8: // GSSAPI continue
if cn.gss == nil {
errorf("GSSAPI protocol error")
}
b := []byte(*r)
done, tokOut, err := cn.gss.Continue(b)
if err == nil && !done {
w := cn.writeBuf('p')
w.bytes(tokOut)
cn.send(w)
}
// Errors fall through and read the more detailed message
// from the server..
case 10:
sc := scram.NewClient(sha256.New, o["user"], o["password"])
sc.Step(nil)
@ -1500,6 +1580,39 @@ func QuoteIdentifier(name string) string {
return `"` + strings.Replace(name, `"`, `""`, -1) + `"`
}
// QuoteLiteral quotes a 'literal' (e.g. a parameter, often used to pass literal
// to DDL and other statements that do not accept parameters) to be used as part
// of an SQL statement. For example:
//
// exp_date := pq.QuoteLiteral("2023-01-05 15:00:00Z")
// err := db.Exec(fmt.Sprintf("CREATE ROLE my_user VALID UNTIL %s", exp_date))
//
// Any single quotes in name will be escaped. Any backslashes (i.e. "\") will be
// replaced by two backslashes (i.e. "\\") and the C-style escape identifier
// that PostgreSQL provides ('E') will be prepended to the string.
func QuoteLiteral(literal string) string {
// This follows the PostgreSQL internal algorithm for handling quoted literals
// from libpq, which can be found in the "PQEscapeStringInternal" function,
// which is found in the libpq/fe-exec.c source file:
// https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/interfaces/libpq/fe-exec.c
//
// substitute any single-quotes (') with two single-quotes ('')
literal = strings.Replace(literal, `'`, `''`, -1)
// determine if the string has any backslashes (\) in it.
// if it does, replace any backslashes (\) with two backslashes (\\)
// then, we need to wrap the entire string with a PostgreSQL
// C-style escape. Per how "PQEscapeStringInternal" handles this case, we
// also add a space before the "E"
if strings.Contains(literal, `\`) {
literal = strings.Replace(literal, `\`, `\\`, -1)
literal = ` E'` + literal + `'`
} else {
// otherwise, we can just wrap the literal with a pair of single quotes
literal = `'` + literal + `'`
}
return literal
}
func md5s(s string) string {
h := md5.New()
h.Write([]byte(s))

View File

@ -79,7 +79,7 @@ func (cn *conn) Ping(ctx context.Context) error {
if finish := cn.watchCancel(ctx); finish != nil {
defer finish()
}
rows, err := cn.simpleQuery("SELECT 'lib/pq ping test';")
rows, err := cn.simpleQuery(";")
if err != nil {
return driver.ErrBadConn // https://golang.org/pkg/database/sql/driver/#Pinger
}

View File

@ -27,7 +27,7 @@ func (c *Connector) Connect(ctx context.Context) (driver.Conn, error) {
return c.open(ctx)
}
// Driver returnst the underlying driver of this Connector.
// Driver returns the underlying driver of this Connector.
func (c *Connector) Driver() driver.Driver {
return &Driver{}
}
@ -106,5 +106,10 @@ func NewConnector(dsn string) (*Connector, error) {
o["user"] = u
}
// SSL is not necessary or supported over UNIX domain sockets
if network, _ := network(o); network == "unix" {
o["sslmode"] = "disable"
}
return &Connector{opts: o, dialer: defaultDialer{}}, nil
}

29
vendor/github.com/lib/pq/copy.go generated vendored
View File

@ -49,6 +49,7 @@ type copyin struct {
buffer []byte
rowData chan []byte
done chan bool
driver.Result
closed bool
@ -151,8 +152,12 @@ func (ci *copyin) resploop() {
switch t {
case 'C':
// complete
res, _ := ci.cn.parseComplete(r.string())
ci.setResult(res)
case 'N':
// NoticeResponse
if n := ci.cn.noticeHandler; n != nil {
n(parseError(&r))
}
case 'Z':
ci.cn.processReadyForQuery(&r)
ci.done <- true
@ -199,6 +204,22 @@ func (ci *copyin) setError(err error) {
ci.Unlock()
}
func (ci *copyin) setResult(result driver.Result) {
ci.Lock()
ci.Result = result
ci.Unlock()
}
func (ci *copyin) getResult() driver.Result {
ci.Lock()
result := ci.Result
if result == nil {
return driver.RowsAffected(0)
}
ci.Unlock()
return result
}
func (ci *copyin) NumInput() int {
return -1
}
@ -229,7 +250,11 @@ func (ci *copyin) Exec(v []driver.Value) (r driver.Result, err error) {
}
if len(v) == 0 {
return nil, ci.Close()
if err := ci.Close(); err != nil {
return driver.RowsAffected(0), err
}
return ci.getResult(), nil
}
numValues := len(v)

23
vendor/github.com/lib/pq/doc.go generated vendored
View File

@ -241,5 +241,28 @@ bytes by the PostgreSQL server.
You can find a complete, working example of Listener usage at
https://godoc.org/github.com/lib/pq/example/listen.
Kerberos Support
If you need support for Kerberos authentication, add the following to your main
package:
import "github.com/lib/pq/auth/kerberos"
func init() {
pq.RegisterGSSProvider(func() (pq.Gss, error) { return kerberos.NewGSS() })
}
This package is in a separate module so that users who don't need Kerberos
don't have to download unnecessary dependencies.
When imported, additional connection string parameters are supported:
* krbsrvname - GSS (Kerberos) service name when constructing the
SPN (default is `postgres`). This will be combined with the host
to form the full SPN: `krbsrvname/host`.
* krbspn - GSS (Kerberos) SPN. This takes priority over
`krbsrvname` if present.
*/
package pq

22
vendor/github.com/lib/pq/encode.go generated vendored
View File

@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"math"
"regexp"
"strconv"
"strings"
"sync"
@ -16,6 +17,8 @@ import (
"github.com/lib/pq/oid"
)
var time2400Regex = regexp.MustCompile(`^(24:00(?::00(?:\.0+)?)?)(?:[Z+-].*)?$`)
func binaryEncode(parameterStatus *parameterStatus, x interface{}) []byte {
switch v := x.(type) {
case []byte:
@ -202,10 +205,27 @@ func mustParse(f string, typ oid.Oid, s []byte) time.Time {
str[len(str)-3] == ':' {
f += ":00"
}
// Special case for 24:00 time.
// Unfortunately, golang does not parse 24:00 as a proper time.
// In this case, we want to try "round to the next day", to differentiate.
// As such, we find if the 24:00 time matches at the beginning; if so,
// we default it back to 00:00 but add a day later.
var is2400Time bool
switch typ {
case oid.T_timetz, oid.T_time:
if matches := time2400Regex.FindStringSubmatch(str); matches != nil {
// Concatenate timezone information at the back.
str = "00:00:00" + str[len(matches[1]):]
is2400Time = true
}
}
t, err := time.Parse(f, str)
if err != nil {
errorf("decode: %s", err)
}
if is2400Time {
t = t.Add(24 * time.Hour)
}
return t
}
@ -487,7 +507,7 @@ func FormatTimestamp(t time.Time) []byte {
b := []byte(t.Format("2006-01-02 15:04:05.999999999Z07:00"))
_, offset := t.Zone()
offset = offset % 60
offset %= 60
if offset != 0 {
// RFC3339Nano already printed the minus sign
if offset < 0 {

10
vendor/github.com/lib/pq/error.go generated vendored
View File

@ -478,13 +478,13 @@ func errRecoverNoErrBadConn(err *error) {
}
}
func (c *conn) errRecover(err *error) {
func (cn *conn) errRecover(err *error) {
e := recover()
switch v := e.(type) {
case nil:
// Do nothing
case runtime.Error:
c.bad = true
cn.bad = true
panic(v)
case *Error:
if v.Fatal() {
@ -493,7 +493,7 @@ func (c *conn) errRecover(err *error) {
*err = v
}
case *net.OpError:
c.bad = true
cn.bad = true
*err = v
case error:
if v == io.EOF || v.(error).Error() == "remote error: handshake failure" {
@ -503,13 +503,13 @@ func (c *conn) errRecover(err *error) {
}
default:
c.bad = true
cn.bad = true
panic(fmt.Sprintf("unknown error: %#v", e))
}
// Any time we return ErrBadConn, we need to remember it since *Tx doesn't
// mark the connection bad in database/sql.
if *err == driver.ErrBadConn {
c.bad = true
cn.bad = true
}
}

2
vendor/github.com/lib/pq/go.mod generated vendored
View File

@ -1 +1,3 @@
module github.com/lib/pq
go 1.13

27
vendor/github.com/lib/pq/krb.go generated vendored Normal file
View File

@ -0,0 +1,27 @@
package pq
// NewGSSFunc creates a GSS authentication provider, for use with
// RegisterGSSProvider.
type NewGSSFunc func() (GSS, error)
var newGss NewGSSFunc
// RegisterGSSProvider registers a GSS authentication provider. For example, if
// you need to use Kerberos to authenticate with your server, add this to your
// main package:
//
// import "github.com/lib/pq/auth/kerberos"
//
// func init() {
// pq.RegisterGSSProvider(func() (pq.GSS, error) { return kerberos.NewGSS() })
// }
func RegisterGSSProvider(newGssArg NewGSSFunc) {
newGss = newGssArg
}
// GSS provides GSSAPI authentication (e.g., Kerberos).
type GSS interface {
GetInitToken(host string, service string) ([]byte, error)
GetInitTokenFromSpn(spn string) ([]byte, error)
Continue(inToken []byte) (done bool, outToken []byte, err error)
}

71
vendor/github.com/lib/pq/notice.go generated vendored Normal file
View File

@ -0,0 +1,71 @@
// +build go1.10
package pq
import (
"context"
"database/sql/driver"
)
// NoticeHandler returns the notice handler on the given connection, if any. A
// runtime panic occurs if c is not a pq connection. This is rarely used
// directly, use ConnectorNoticeHandler and ConnectorWithNoticeHandler instead.
func NoticeHandler(c driver.Conn) func(*Error) {
return c.(*conn).noticeHandler
}
// SetNoticeHandler sets the given notice handler on the given connection. A
// runtime panic occurs if c is not a pq connection. A nil handler may be used
// to unset it. This is rarely used directly, use ConnectorNoticeHandler and
// ConnectorWithNoticeHandler instead.
//
// Note: Notice handlers are executed synchronously by pq meaning commands
// won't continue to be processed until the handler returns.
func SetNoticeHandler(c driver.Conn, handler func(*Error)) {
c.(*conn).noticeHandler = handler
}
// NoticeHandlerConnector wraps a regular connector and sets a notice handler
// on it.
type NoticeHandlerConnector struct {
driver.Connector
noticeHandler func(*Error)
}
// Connect calls the underlying connector's connect method and then sets the
// notice handler.
func (n *NoticeHandlerConnector) Connect(ctx context.Context) (driver.Conn, error) {
c, err := n.Connector.Connect(ctx)
if err == nil {
SetNoticeHandler(c, n.noticeHandler)
}
return c, err
}
// ConnectorNoticeHandler returns the currently set notice handler, if any. If
// the given connector is not a result of ConnectorWithNoticeHandler, nil is
// returned.
func ConnectorNoticeHandler(c driver.Connector) func(*Error) {
if c, ok := c.(*NoticeHandlerConnector); ok {
return c.noticeHandler
}
return nil
}
// ConnectorWithNoticeHandler creates or sets the given handler for the given
// connector. If the given connector is a result of calling this function
// previously, it is simply set on the given connector and returned. Otherwise,
// this returns a new connector wrapping the given one and setting the notice
// handler. A nil notice handler may be used to unset it.
//
// The returned connector is intended to be used with database/sql.OpenDB.
//
// Note: Notice handlers are executed synchronously by pq meaning commands
// won't continue to be processed until the handler returns.
func ConnectorWithNoticeHandler(c driver.Connector, handler func(*Error)) *NoticeHandlerConnector {
if c, ok := c.(*NoticeHandlerConnector); ok {
c.noticeHandler = handler
return c
}
return &NoticeHandlerConnector{Connector: c, noticeHandler: handler}
}

63
vendor/github.com/lib/pq/notify.go generated vendored
View File

@ -4,6 +4,8 @@ package pq
// This module contains support for Postgres LISTEN/NOTIFY.
import (
"context"
"database/sql/driver"
"errors"
"fmt"
"sync"
@ -29,6 +31,61 @@ func recvNotification(r *readBuf) *Notification {
return &Notification{bePid, channel, extra}
}
// SetNotificationHandler sets the given notification handler on the given
// connection. A runtime panic occurs if c is not a pq connection. A nil handler
// may be used to unset it.
//
// Note: Notification handlers are executed synchronously by pq meaning commands
// won't continue to be processed until the handler returns.
func SetNotificationHandler(c driver.Conn, handler func(*Notification)) {
c.(*conn).notificationHandler = handler
}
// NotificationHandlerConnector wraps a regular connector and sets a notification handler
// on it.
type NotificationHandlerConnector struct {
driver.Connector
notificationHandler func(*Notification)
}
// Connect calls the underlying connector's connect method and then sets the
// notification handler.
func (n *NotificationHandlerConnector) Connect(ctx context.Context) (driver.Conn, error) {
c, err := n.Connector.Connect(ctx)
if err == nil {
SetNotificationHandler(c, n.notificationHandler)
}
return c, err
}
// ConnectorNotificationHandler returns the currently set notification handler, if any. If
// the given connector is not a result of ConnectorWithNotificationHandler, nil is
// returned.
func ConnectorNotificationHandler(c driver.Connector) func(*Notification) {
if c, ok := c.(*NotificationHandlerConnector); ok {
return c.notificationHandler
}
return nil
}
// ConnectorWithNotificationHandler creates or sets the given handler for the given
// connector. If the given connector is a result of calling this function
// previously, it is simply set on the given connector and returned. Otherwise,
// this returns a new connector wrapping the given one and setting the notification
// handler. A nil notification handler may be used to unset it.
//
// The returned connector is intended to be used with database/sql.OpenDB.
//
// Note: Notification handlers are executed synchronously by pq meaning commands
// won't continue to be processed until the handler returns.
func ConnectorWithNotificationHandler(c driver.Connector, handler func(*Notification)) *NotificationHandlerConnector {
if c, ok := c.(*NotificationHandlerConnector); ok {
c.notificationHandler = handler
return c
}
return &NotificationHandlerConnector{Connector: c, notificationHandler: handler}
}
const (
connStateIdle int32 = iota
connStateExpectResponse
@ -174,8 +231,12 @@ func (l *ListenerConn) listenerConnLoop() (err error) {
}
l.replyChan <- message{t, nil}
case 'N', 'S':
case 'S':
// ignore
case 'N':
if n := l.cn.noticeHandler; n != nil {
n(parseError(r))
}
default:
return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t)
}

View File

@ -22,7 +22,7 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// Pacakage scram implements a SCRAM-{SHA-1,etc} client per RFC5802.
// Package scram implements a SCRAM-{SHA-1,etc} client per RFC5802.
//
// http://tools.ietf.org/html/rfc5802
//
@ -94,7 +94,7 @@ func (c *Client) Out() []byte {
return c.out.Bytes()
}
// Err returns the error that ocurred, or nil if there were no errors.
// Err returns the error that occurred, or nil if there were no errors.
func (c *Client) Err() error {
return c.err
}

View File

@ -1,6 +1,6 @@
// Package pq is a pure Go Postgres driver for the database/sql package.
// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris rumprun
// +build aix darwin dragonfly freebsd linux nacl netbsd openbsd plan9 solaris rumprun
package pq

4
vendor/github.com/mattn/go-sqlite3/.codecov.yml generated vendored Normal file
View File

@ -0,0 +1,4 @@
coverage:
status:
project: off
patch: off

View File

@ -1,33 +0,0 @@
language: go
os:
- linux
- osx
addons:
apt:
update: true
go:
- 1.9.x
- 1.10.x
- 1.11.x
- 1.12.x
- 1.13.x
- master
before_install:
- |
if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then
brew update
fi
- go get github.com/smartystreets/goconvey
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover
script:
- $HOME/gopath/bin/goveralls -repotoken 3qJVUE0iQwqnCbmNcDsjYu1nh4J4KIFXx
- go test -race -v . -tags ""
- go test -race -v . -tags "libsqlite3"
- go test -race -v . -tags "sqlite_allow_uri_authority sqlite_app_armor sqlite_foreign_keys sqlite_fts5 sqlite_icu sqlite_introspect sqlite_json sqlite_preupdate_hook sqlite_secure_delete sqlite_see sqlite_stat4 sqlite_trace sqlite_userauth sqlite_vacuum_incr sqlite_vtable sqlite_unlock_notify"
- go test -race -v . -tags "sqlite_vacuum_full"

View File

@ -2,16 +2,20 @@ go-sqlite3
==========
[![GoDoc Reference](https://godoc.org/github.com/mattn/go-sqlite3?status.svg)](http://godoc.org/github.com/mattn/go-sqlite3)
[![Build Status](https://travis-ci.org/mattn/go-sqlite3.svg?branch=master)](https://travis-ci.org/mattn/go-sqlite3)
[![GitHub Actions](https://github.com/mattn/go-sqlite3/workflows/Go/badge.svg)](https://github.com/mattn/go-sqlite3/actions?query=workflow%3AGo)
[![Financial Contributors on Open Collective](https://opencollective.com/mattn-go-sqlite3/all/badge.svg?label=financial+contributors)](https://opencollective.com/mattn-go-sqlite3)
[![Coverage Status](https://coveralls.io/repos/mattn/go-sqlite3/badge.svg?branch=master)](https://coveralls.io/r/mattn/go-sqlite3?branch=master)
[![codecov](https://codecov.io/gh/mattn/go-sqlite3/branch/master/graph/badge.svg)](https://codecov.io/gh/mattn/go-sqlite3)
[![Go Report Card](https://goreportcard.com/badge/github.com/mattn/go-sqlite3)](https://goreportcard.com/report/github.com/mattn/go-sqlite3)
Latest stable version is v1.14 or later not v2.
~~**NOTE:** The increase to v2 was an accident. There were no major changes or features.~~
# Description
sqlite3 driver conforming to the built-in database/sql interface
Supported Golang version: See .travis.yml
Supported Golang version: See .github/workflows/go.yaml
[This package follows the official Golang Release Policy.](https://golang.org/doc/devel/release.html#policy)
@ -210,9 +214,15 @@ This library can be cross-compiled.
In some cases you are required to the `CC` environment variable with the cross compiler.
Additional information:
- [#491](https://github.com/mattn/go-sqlite3/issues/491)
- [#560](https://github.com/mattn/go-sqlite3/issues/560)
## Cross Compiling from MAC OSX
The simplest way to cross compile from OSX is to use [xgo](https://github.com/karalabe/xgo).
Steps:
- Install [xgo](https://github.com/karalabe/xgo) (`go get github.com/karalabe/xgo`).
- Ensure that your project is within your `GOPATH`.
- Run `xgo local/path/to/project`.
Please refer to the project's [README](https://github.com/karalabe/xgo/blob/master/README.md) for further information.
# Google Cloud Platform
@ -451,6 +461,16 @@ If you want your own extension to be listed here or you want to add a reference
Spatialite is available as an extension to SQLite, and can be used in combination with this repository.
For an example see [shaxbee/go-spatialite](https://github.com/shaxbee/go-spatialite).
## extension-functions.c from SQLite3 Contrib
extension-functions.c is available as an extension to SQLite, and provides the following functions:
- Math: acos, asin, atan, atn2, atan2, acosh, asinh, atanh, difference, degrees, radians, cos, sin, tan, cot, cosh, sinh, tanh, coth, exp, log, log10, power, sign, sqrt, square, ceil, floor, pi.
- String: replicate, charindex, leftstr, rightstr, ltrim, rtrim, trim, replace, reverse, proper, padl, padr, padc, strfilter.
- Aggregate: stdev, variance, mode, median, lower_quartile, upper_quartile
For an example see [dinedal/go-sqlite3-extension-functions](https://github.com/dinedal/go-sqlite3-extension-functions).
# FAQ
- Getting insert error while query is opened.

View File

@ -35,56 +35,55 @@ import (
//export callbackTrampoline
func callbackTrampoline(ctx *C.sqlite3_context, argc int, argv **C.sqlite3_value) {
args := (*[(math.MaxInt32 - 1) / unsafe.Sizeof((*C.sqlite3_value)(nil))]*C.sqlite3_value)(unsafe.Pointer(argv))[:argc:argc]
fi := lookupHandle(uintptr(C.sqlite3_user_data(ctx))).(*functionInfo)
fi := lookupHandle(C.sqlite3_user_data(ctx)).(*functionInfo)
fi.Call(ctx, args)
}
//export stepTrampoline
func stepTrampoline(ctx *C.sqlite3_context, argc C.int, argv **C.sqlite3_value) {
args := (*[(math.MaxInt32 - 1) / unsafe.Sizeof((*C.sqlite3_value)(nil))]*C.sqlite3_value)(unsafe.Pointer(argv))[:int(argc):int(argc)]
ai := lookupHandle(uintptr(C.sqlite3_user_data(ctx))).(*aggInfo)
ai := lookupHandle(C.sqlite3_user_data(ctx)).(*aggInfo)
ai.Step(ctx, args)
}
//export doneTrampoline
func doneTrampoline(ctx *C.sqlite3_context) {
handle := uintptr(C.sqlite3_user_data(ctx))
ai := lookupHandle(handle).(*aggInfo)
ai := lookupHandle(C.sqlite3_user_data(ctx)).(*aggInfo)
ai.Done(ctx)
}
//export compareTrampoline
func compareTrampoline(handlePtr uintptr, la C.int, a *C.char, lb C.int, b *C.char) C.int {
func compareTrampoline(handlePtr unsafe.Pointer, la C.int, a *C.char, lb C.int, b *C.char) C.int {
cmp := lookupHandle(handlePtr).(func(string, string) int)
return C.int(cmp(C.GoStringN(a, la), C.GoStringN(b, lb)))
}
//export commitHookTrampoline
func commitHookTrampoline(handle uintptr) int {
func commitHookTrampoline(handle unsafe.Pointer) int {
callback := lookupHandle(handle).(func() int)
return callback()
}
//export rollbackHookTrampoline
func rollbackHookTrampoline(handle uintptr) {
func rollbackHookTrampoline(handle unsafe.Pointer) {
callback := lookupHandle(handle).(func())
callback()
}
//export updateHookTrampoline
func updateHookTrampoline(handle uintptr, op int, db *C.char, table *C.char, rowid int64) {
func updateHookTrampoline(handle unsafe.Pointer, op int, db *C.char, table *C.char, rowid int64) {
callback := lookupHandle(handle).(func(int, string, string, int64))
callback(op, C.GoString(db), C.GoString(table), rowid)
}
//export authorizerTrampoline
func authorizerTrampoline(handle uintptr, op int, arg1 *C.char, arg2 *C.char, arg3 *C.char) int {
func authorizerTrampoline(handle unsafe.Pointer, op int, arg1 *C.char, arg2 *C.char, arg3 *C.char) int {
callback := lookupHandle(handle).(func(int, string, string, string) int)
return callback(op, C.GoString(arg1), C.GoString(arg2), C.GoString(arg3))
}
//export preUpdateHookTrampoline
func preUpdateHookTrampoline(handle uintptr, dbHandle uintptr, op int, db *C.char, table *C.char, oldrowid int64, newrowid int64) {
func preUpdateHookTrampoline(handle unsafe.Pointer, dbHandle uintptr, op int, db *C.char, table *C.char, oldrowid int64, newrowid int64) {
hval := lookupHandleVal(handle)
data := SQLitePreUpdateData{
Conn: hval.db,
@ -105,33 +104,27 @@ type handleVal struct {
}
var handleLock sync.Mutex
var handleVals = make(map[uintptr]handleVal)
var handleIndex uintptr = 100
var handleVals = make(map[unsafe.Pointer]handleVal)
func newHandle(db *SQLiteConn, v interface{}) uintptr {
func newHandle(db *SQLiteConn, v interface{}) unsafe.Pointer {
handleLock.Lock()
defer handleLock.Unlock()
i := handleIndex
handleIndex++
handleVals[i] = handleVal{db, v}
return i
}
func lookupHandleVal(handle uintptr) handleVal {
handleLock.Lock()
defer handleLock.Unlock()
r, ok := handleVals[handle]
if !ok {
if handle >= 100 && handle < handleIndex {
panic("deleted handle")
} else {
panic("invalid handle")
}
val := handleVal{db: db, val: v}
var p unsafe.Pointer = C.malloc(C.size_t(1))
if p == nil {
panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil")
}
return r
handleVals[p] = val
return p
}
func lookupHandle(handle uintptr) interface{} {
func lookupHandleVal(handle unsafe.Pointer) handleVal {
handleLock.Lock()
defer handleLock.Unlock()
return handleVals[handle]
}
func lookupHandle(handle unsafe.Pointer) interface{} {
return lookupHandleVal(handle).val
}
@ -141,6 +134,7 @@ func deleteHandles(db *SQLiteConn) {
for handle, val := range handleVals {
if val.db == db {
delete(handleVals, handle)
C.free(handle)
}
}
}

View File

@ -5,7 +5,15 @@
package sqlite3
/*
#ifndef USE_LIBSQLITE3
#include <sqlite3-binding.h>
#else
#include <sqlite3.h>
#endif
*/
import "C"
import "syscall"
// ErrNo inherit errno.
type ErrNo int
@ -20,6 +28,7 @@ type ErrNoExtended int
type Error struct {
Code ErrNo /* The error code returned by SQLite */
ExtendedCode ErrNoExtended /* The extended error code returned by SQLite */
SystemErrno syscall.Errno /* The system errno returned by the OS through SQLite, if applicable */
err string /* The error string returned by sqlite3_errmsg(),
this usually contains more specific details. */
}
@ -72,10 +81,16 @@ func (err ErrNoExtended) Error() string {
}
func (err Error) Error() string {
var str string
if err.err != "" {
return err.err
str = err.err
} else {
str = C.GoString(C.sqlite3_errstr(C.int(err.Code)))
}
return errorString(err)
if err.SystemErrno != 0 {
str += ": " + err.SystemErrno.Error()
}
return str
}
// result codes from http://www.sqlite.org/c3ref/c_abort_rollback.html

3
vendor/github.com/mattn/go-sqlite3/go.mod generated vendored Normal file
View File

@ -0,0 +1,3 @@
module github.com/mattn/go-sqlite3
go 1.10

0
vendor/github.com/mattn/go-sqlite3/go.sum generated vendored Normal file
View File

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -183,6 +183,12 @@ static int _sqlite3_limit(sqlite3* db, int limitId, int newLimit) {
return sqlite3_limit(db, limitId, newLimit);
#endif
}
#if SQLITE_VERSION_NUMBER < 3012000
static int sqlite3_system_errno(sqlite3 *db) {
return 0;
}
#endif
*/
import "C"
import (
@ -198,6 +204,7 @@ import (
"strconv"
"strings"
"sync"
"syscall"
"time"
"unsafe"
)
@ -466,7 +473,7 @@ func (c *SQLiteConn) RegisterCollation(name string, cmp func(string, string) int
handle := newHandle(c, cmp)
cname := C.CString(name)
defer C.free(unsafe.Pointer(cname))
rv := C.sqlite3_create_collation(c.db, cname, C.SQLITE_UTF8, unsafe.Pointer(handle), (*[0]byte)(unsafe.Pointer(C.compareTrampoline)))
rv := C.sqlite3_create_collation(c.db, cname, C.SQLITE_UTF8, handle, (*[0]byte)(unsafe.Pointer(C.compareTrampoline)))
if rv != C.SQLITE_OK {
return c.lastError()
}
@ -484,7 +491,7 @@ func (c *SQLiteConn) RegisterCommitHook(callback func() int) {
if callback == nil {
C.sqlite3_commit_hook(c.db, nil, nil)
} else {
C.sqlite3_commit_hook(c.db, (*[0]byte)(C.commitHookTrampoline), unsafe.Pointer(newHandle(c, callback)))
C.sqlite3_commit_hook(c.db, (*[0]byte)(C.commitHookTrampoline), newHandle(c, callback))
}
}
@ -497,7 +504,7 @@ func (c *SQLiteConn) RegisterRollbackHook(callback func()) {
if callback == nil {
C.sqlite3_rollback_hook(c.db, nil, nil)
} else {
C.sqlite3_rollback_hook(c.db, (*[0]byte)(C.rollbackHookTrampoline), unsafe.Pointer(newHandle(c, callback)))
C.sqlite3_rollback_hook(c.db, (*[0]byte)(C.rollbackHookTrampoline), newHandle(c, callback))
}
}
@ -514,7 +521,7 @@ func (c *SQLiteConn) RegisterUpdateHook(callback func(int, string, string, int64
if callback == nil {
C.sqlite3_update_hook(c.db, nil, nil)
} else {
C.sqlite3_update_hook(c.db, (*[0]byte)(C.updateHookTrampoline), unsafe.Pointer(newHandle(c, callback)))
C.sqlite3_update_hook(c.db, (*[0]byte)(C.updateHookTrampoline), newHandle(c, callback))
}
}
@ -528,7 +535,7 @@ func (c *SQLiteConn) RegisterAuthorizer(callback func(int, string, string, strin
if callback == nil {
C.sqlite3_set_authorizer(c.db, nil, nil)
} else {
C.sqlite3_set_authorizer(c.db, (*[0]byte)(C.authorizerTrampoline), unsafe.Pointer(newHandle(c, callback)))
C.sqlite3_set_authorizer(c.db, (*[0]byte)(C.authorizerTrampoline), newHandle(c, callback))
}
}
@ -609,8 +616,8 @@ func (c *SQLiteConn) RegisterFunc(name string, impl interface{}, pure bool) erro
return nil
}
func sqlite3CreateFunction(db *C.sqlite3, zFunctionName *C.char, nArg C.int, eTextRep C.int, pApp uintptr, xFunc unsafe.Pointer, xStep unsafe.Pointer, xFinal unsafe.Pointer) C.int {
return C._sqlite3_create_function(db, zFunctionName, nArg, eTextRep, C.uintptr_t(pApp), (*[0]byte)(xFunc), (*[0]byte)(xStep), (*[0]byte)(xFinal))
func sqlite3CreateFunction(db *C.sqlite3, zFunctionName *C.char, nArg C.int, eTextRep C.int, pApp unsafe.Pointer, xFunc unsafe.Pointer, xStep unsafe.Pointer, xFinal unsafe.Pointer) C.int {
return C._sqlite3_create_function(db, zFunctionName, nArg, eTextRep, C.uintptr_t(uintptr(pApp)), (*[0]byte)(xFunc), (*[0]byte)(xStep), (*[0]byte)(xFinal))
}
// RegisterAggregator makes a Go type available as a SQLite aggregation function.
@ -749,15 +756,28 @@ func (c *SQLiteConn) lastError() error {
return lastError(c.db)
}
// Note: may be called with db == nil
func lastError(db *C.sqlite3) error {
rv := C.sqlite3_errcode(db)
rv := C.sqlite3_errcode(db) // returns SQLITE_NOMEM if db == nil
if rv == C.SQLITE_OK {
return nil
}
extrv := C.sqlite3_extended_errcode(db) // returns SQLITE_NOMEM if db == nil
errStr := C.GoString(C.sqlite3_errmsg(db)) // returns "out of memory" if db == nil
// https://www.sqlite.org/c3ref/system_errno.html
// sqlite3_system_errno is only meaningful if the error code was SQLITE_CANTOPEN,
// or it was SQLITE_IOERR and the extended code was not SQLITE_IOERR_NOMEM
var systemErrno syscall.Errno
if rv == C.SQLITE_CANTOPEN || (rv == C.SQLITE_IOERR && extrv != C.SQLITE_IOERR_NOMEM) {
systemErrno = syscall.Errno(C.sqlite3_system_errno(db))
}
return Error{
Code: ErrNo(rv),
ExtendedCode: ErrNoExtended(C.sqlite3_extended_errcode(db)),
err: C.GoString(C.sqlite3_errmsg(db)),
ExtendedCode: ErrNoExtended(extrv),
SystemErrno: systemErrno,
err: errStr,
}
}
@ -782,20 +802,29 @@ func (c *SQLiteConn) exec(ctx context.Context, query string, args []namedValue)
}
var res driver.Result
if s.(*SQLiteStmt).s != nil {
stmtArgs := make([]namedValue, 0, len(args))
na := s.NumInput()
if len(args) < na {
if len(args)-start < na {
s.Close()
return nil, fmt.Errorf("not enough args to execute query: want %d got %d", na, len(args))
}
for i := 0; i < na; i++ {
args[i].Ordinal -= start
// consume the number of arguments used in the current
// statement and append all named arguments not
// contained therein
stmtArgs = append(stmtArgs, args[start:start+na]...)
for i := range args {
if (i < start || i >= na) && args[i].Name != "" {
stmtArgs = append(stmtArgs, args[i])
}
}
res, err = s.(*SQLiteStmt).exec(ctx, args[:na])
for i := range stmtArgs {
stmtArgs[i].Ordinal = i + 1
}
res, err = s.(*SQLiteStmt).exec(ctx, stmtArgs)
if err != nil && err != driver.ErrSkip {
s.Close()
return nil, err
}
args = args[na:]
start += na
}
tail := s.(*SQLiteStmt).t
@ -828,24 +857,33 @@ func (c *SQLiteConn) Query(query string, args []driver.Value) (driver.Rows, erro
func (c *SQLiteConn) query(ctx context.Context, query string, args []namedValue) (driver.Rows, error) {
start := 0
for {
stmtArgs := make([]namedValue, 0, len(args))
s, err := c.prepare(ctx, query)
if err != nil {
return nil, err
}
s.(*SQLiteStmt).cls = true
na := s.NumInput()
if len(args) < na {
return nil, fmt.Errorf("not enough args to execute query: want %d got %d", na, len(args))
if len(args)-start < na {
return nil, fmt.Errorf("not enough args to execute query: want %d got %d", na, len(args)-start)
}
for i := 0; i < na; i++ {
args[i].Ordinal -= start
// consume the number of arguments used in the current
// statement and append all named arguments not contained
// therein
stmtArgs = append(stmtArgs, args[start:start+na]...)
for i := range args {
if (i < start || i >= na) && args[i].Name != "" {
stmtArgs = append(stmtArgs, args[i])
}
}
rows, err := s.(*SQLiteStmt).query(ctx, args[:na])
for i := range stmtArgs {
stmtArgs[i].Ordinal = i + 1
}
rows, err := s.(*SQLiteStmt).query(ctx, stmtArgs)
if err != nil && err != driver.ErrSkip {
s.Close()
return rows, err
}
args = args[na:]
start += na
tail := s.(*SQLiteStmt).t
if tail == "" {
@ -869,10 +907,6 @@ func (c *SQLiteConn) begin(ctx context.Context) (driver.Tx, error) {
return &SQLiteTx{c}, nil
}
func errorString(err Error) string {
return C.GoString(C.sqlite3_errstr(C.int(err.Code)))
}
// Open database and return a new connection.
//
// A pragma can take either zero or one argument.
@ -897,7 +931,7 @@ func errorString(err Error) string {
// - rwc
// - memory
//
// shared
// cache
// SQLite Shared-Cache Mode
// https://www.sqlite.org/sharedcache.html
// Values:
@ -1342,10 +1376,13 @@ func (d *SQLiteDriver) Open(dsn string) (driver.Conn, error) {
mutex|C.SQLITE_OPEN_READWRITE|C.SQLITE_OPEN_CREATE,
nil)
if rv != 0 {
// Save off the error _before_ closing the database.
// This is safe even if db is nil.
err := lastError(db)
if db != nil {
C.sqlite3_close_v2(db)
}
return nil, Error{Code: ErrNo(rv)}
return nil, err
}
if db == nil {
return nil, errors.New("sqlite succeeded without returning a database")
@ -1759,11 +1796,6 @@ func (s *SQLiteStmt) NumInput() int {
return int(C.sqlite3_bind_parameter_count(s.s))
}
type bindArg struct {
n int
v driver.Value
}
var placeHolder = []byte{0}
func (s *SQLiteStmt) bind(args []namedValue) error {
@ -1772,52 +1804,63 @@ func (s *SQLiteStmt) bind(args []namedValue) error {
return s.c.lastError()
}
bindIndices := make([][3]int, len(args))
prefixes := []string{":", "@", "$"}
for i, v := range args {
bindIndices[i][0] = args[i].Ordinal
if v.Name != "" {
cname := C.CString(":" + v.Name)
args[i].Ordinal = int(C.sqlite3_bind_parameter_index(s.s, cname))
C.free(unsafe.Pointer(cname))
for j := range prefixes {
cname := C.CString(prefixes[j] + v.Name)
bindIndices[i][j] = int(C.sqlite3_bind_parameter_index(s.s, cname))
C.free(unsafe.Pointer(cname))
}
args[i].Ordinal = bindIndices[i][0]
}
}
for _, arg := range args {
n := C.int(arg.Ordinal)
switch v := arg.Value.(type) {
case nil:
rv = C.sqlite3_bind_null(s.s, n)
case string:
if len(v) == 0 {
rv = C._sqlite3_bind_text(s.s, n, (*C.char)(unsafe.Pointer(&placeHolder[0])), C.int(0))
} else {
b := []byte(v)
for i, arg := range args {
for j := range bindIndices[i] {
if bindIndices[i][j] == 0 {
continue
}
n := C.int(bindIndices[i][j])
switch v := arg.Value.(type) {
case nil:
rv = C.sqlite3_bind_null(s.s, n)
case string:
if len(v) == 0 {
rv = C._sqlite3_bind_text(s.s, n, (*C.char)(unsafe.Pointer(&placeHolder[0])), C.int(0))
} else {
b := []byte(v)
rv = C._sqlite3_bind_text(s.s, n, (*C.char)(unsafe.Pointer(&b[0])), C.int(len(b)))
}
case int64:
rv = C.sqlite3_bind_int64(s.s, n, C.sqlite3_int64(v))
case bool:
if v {
rv = C.sqlite3_bind_int(s.s, n, 1)
} else {
rv = C.sqlite3_bind_int(s.s, n, 0)
}
case float64:
rv = C.sqlite3_bind_double(s.s, n, C.double(v))
case []byte:
if v == nil {
rv = C.sqlite3_bind_null(s.s, n)
} else {
ln := len(v)
if ln == 0 {
v = placeHolder
}
rv = C._sqlite3_bind_blob(s.s, n, unsafe.Pointer(&v[0]), C.int(ln))
}
case time.Time:
b := []byte(v.Format(SQLiteTimestampFormats[0]))
rv = C._sqlite3_bind_text(s.s, n, (*C.char)(unsafe.Pointer(&b[0])), C.int(len(b)))
}
case int64:
rv = C.sqlite3_bind_int64(s.s, n, C.sqlite3_int64(v))
case bool:
if v {
rv = C.sqlite3_bind_int(s.s, n, 1)
} else {
rv = C.sqlite3_bind_int(s.s, n, 0)
if rv != C.SQLITE_OK {
return s.c.lastError()
}
case float64:
rv = C.sqlite3_bind_double(s.s, n, C.double(v))
case []byte:
if v == nil {
rv = C.sqlite3_bind_null(s.s, n)
} else {
ln := len(v)
if ln == 0 {
v = placeHolder
}
rv = C._sqlite3_bind_blob(s.s, n, unsafe.Pointer(&v[0]), C.int(ln))
}
case time.Time:
b := []byte(v.Format(SQLiteTimestampFormats[0]))
rv = C._sqlite3_bind_text(s.s, n, (*C.char)(unsafe.Pointer(&b[0])), C.int(len(b)))
}
if rv != C.SQLITE_OK {
return s.c.lastError()
}
}
return nil
@ -1875,6 +1918,14 @@ func (s *SQLiteStmt) Exec(args []driver.Value) (driver.Result, error) {
return s.exec(context.Background(), list)
}
func isInterruptErr(err error) bool {
sqliteErr, ok := err.(Error)
if ok {
return sqliteErr.Code == ErrInterrupt
}
return false
}
// exec executes a query that doesn't return rows. Attempts to honor context timeout.
func (s *SQLiteStmt) exec(ctx context.Context, args []namedValue) (driver.Result, error) {
if ctx.Done() == nil {
@ -1890,19 +1941,22 @@ func (s *SQLiteStmt) exec(ctx context.Context, args []namedValue) (driver.Result
r, err := s.execSync(args)
resultCh <- result{r, err}
}()
var rv result
select {
case rv := <- resultCh:
return rv.r, rv.err
case rv = <-resultCh:
case <-ctx.Done():
select {
case <-resultCh: // no need to interrupt
case rv = <-resultCh: // no need to interrupt, operation completed in db
default:
// this is still racy and can be no-op if executed between sqlite3_* calls in execSync.
C.sqlite3_interrupt(s.c.db)
<-resultCh // ensure goroutine completed
rv = <-resultCh // wait for goroutine completed
if isInterruptErr(rv.err) {
return nil, ctx.Err()
}
}
return nil, ctx.Err()
}
return rv.r, rv.err
}
func (s *SQLiteStmt) execSync(args []namedValue) (driver.Result, error) {
@ -1992,7 +2046,7 @@ func (rc *SQLiteRows) Next(dest []driver.Value) error {
resultCh <- rc.nextSyncLocked(dest)
}()
select {
case err := <- resultCh:
case err := <-resultCh:
return err
case <-rc.ctx.Done():
select {

View File

@ -14,5 +14,6 @@ package sqlite3
#cgo darwin CFLAGS: -I/usr/local/opt/sqlite/include
#cgo openbsd LDFLAGS: -lsqlite3
#cgo solaris LDFLAGS: -lsqlite3
#cgo windows LDFLAGS: -lsqlite3
*/
import "C"

View File

@ -28,12 +28,9 @@ func (c *SQLiteConn) loadExtensions(extensions []string) error {
}
for _, extension := range extensions {
cext := C.CString(extension)
defer C.free(unsafe.Pointer(cext))
rv = C.sqlite3_load_extension(c.db, cext, nil, nil)
if rv != C.SQLITE_OK {
if err := c.loadExtension(extension, nil); err != nil {
C.sqlite3_enable_load_extension(c.db, 0)
return errors.New(C.GoString(C.sqlite3_errmsg(c.db)))
return err
}
}
@ -41,6 +38,7 @@ func (c *SQLiteConn) loadExtensions(extensions []string) error {
if rv != C.SQLITE_OK {
return errors.New(C.GoString(C.sqlite3_errmsg(c.db)))
}
return nil
}
@ -51,14 +49,9 @@ func (c *SQLiteConn) LoadExtension(lib string, entry string) error {
return errors.New(C.GoString(C.sqlite3_errmsg(c.db)))
}
clib := C.CString(lib)
defer C.free(unsafe.Pointer(clib))
centry := C.CString(entry)
defer C.free(unsafe.Pointer(centry))
rv = C.sqlite3_load_extension(c.db, clib, centry, nil)
if rv != C.SQLITE_OK {
return errors.New(C.GoString(C.sqlite3_errmsg(c.db)))
if err := c.loadExtension(lib, &entry); err != nil {
C.sqlite3_enable_load_extension(c.db, 0)
return err
}
rv = C.sqlite3_enable_load_extension(c.db, 0)
@ -68,3 +61,24 @@ func (c *SQLiteConn) LoadExtension(lib string, entry string) error {
return nil
}
func (c *SQLiteConn) loadExtension(lib string, entry *string) error {
clib := C.CString(lib)
defer C.free(unsafe.Pointer(clib))
var centry *C.char
if entry != nil {
centry = C.CString(*entry)
defer C.free(unsafe.Pointer(centry))
}
var errMsg *C.char
defer C.sqlite3_free(unsafe.Pointer(errMsg))
rv := C.sqlite3_load_extension(c.db, clib, centry, &errMsg)
if rv != C.SQLITE_OK {
return errors.New(C.GoString(errMsg))
}
return nil
}

View File

@ -288,10 +288,13 @@ type InfoOrderBy struct {
}
func constraints(info *C.sqlite3_index_info) []InfoConstraint {
l := info.nConstraint
slice := (*[1 << 30]C.struct_sqlite3_index_constraint)(unsafe.Pointer(info.aConstraint))[:l:l]
slice := *(*[]C.struct_sqlite3_index_constraint)(unsafe.Pointer(&reflect.SliceHeader{
Data: uintptr(unsafe.Pointer(info.aConstraint)),
Len: int(info.nConstraint),
Cap: int(info.nConstraint),
}))
cst := make([]InfoConstraint, 0, l)
cst := make([]InfoConstraint, 0, len(slice))
for _, c := range slice {
var usable bool
if c.usable > 0 {
@ -307,10 +310,13 @@ func constraints(info *C.sqlite3_index_info) []InfoConstraint {
}
func orderBys(info *C.sqlite3_index_info) []InfoOrderBy {
l := info.nOrderBy
slice := (*[1 << 30]C.struct_sqlite3_index_orderby)(unsafe.Pointer(info.aOrderBy))[:l:l]
slice := *(*[]C.struct_sqlite3_index_orderby)(unsafe.Pointer(&reflect.SliceHeader{
Data: uintptr(unsafe.Pointer(info.aOrderBy)),
Len: int(info.nOrderBy),
Cap: int(info.nOrderBy),
}))
ob := make([]InfoOrderBy, 0, l)
ob := make([]InfoOrderBy, 0, len(slice))
for _, c := range slice {
var desc bool
if c.desc > 0 {
@ -347,7 +353,7 @@ func mPrintf(format, arg string) *C.char {
//export goMInit
func goMInit(db, pClientData unsafe.Pointer, argc C.int, argv **C.char, pzErr **C.char, isCreate C.int) C.uintptr_t {
m := lookupHandle(uintptr(pClientData)).(*sqliteModule)
m := lookupHandle(pClientData).(*sqliteModule)
if m.c.db != (*C.sqlite3)(db) {
*pzErr = mPrintf("%s", "Inconsistent db handles")
return 0
@ -373,12 +379,12 @@ func goMInit(db, pClientData unsafe.Pointer, argc C.int, argv **C.char, pzErr **
}
vt := sqliteVTab{m, vTab}
*pzErr = nil
return C.uintptr_t(newHandle(m.c, &vt))
return C.uintptr_t(uintptr(newHandle(m.c, &vt)))
}
//export goVRelease
func goVRelease(pVTab unsafe.Pointer, isDestroy C.int) *C.char {
vt := lookupHandle(uintptr(pVTab)).(*sqliteVTab)
vt := lookupHandle(pVTab).(*sqliteVTab)
var err error
if isDestroy == 1 {
err = vt.vTab.Destroy()
@ -393,7 +399,7 @@ func goVRelease(pVTab unsafe.Pointer, isDestroy C.int) *C.char {
//export goVOpen
func goVOpen(pVTab unsafe.Pointer, pzErr **C.char) C.uintptr_t {
vt := lookupHandle(uintptr(pVTab)).(*sqliteVTab)
vt := lookupHandle(pVTab).(*sqliteVTab)
vTabCursor, err := vt.vTab.Open()
if err != nil {
*pzErr = mPrintf("%s", err.Error())
@ -401,12 +407,12 @@ func goVOpen(pVTab unsafe.Pointer, pzErr **C.char) C.uintptr_t {
}
vtc := sqliteVTabCursor{vt, vTabCursor}
*pzErr = nil
return C.uintptr_t(newHandle(vt.module.c, &vtc))
return C.uintptr_t(uintptr(newHandle(vt.module.c, &vtc)))
}
//export goVBestIndex
func goVBestIndex(pVTab unsafe.Pointer, icp unsafe.Pointer) *C.char {
vt := lookupHandle(uintptr(pVTab)).(*sqliteVTab)
vt := lookupHandle(pVTab).(*sqliteVTab)
info := (*C.sqlite3_index_info)(icp)
csts := constraints(info)
res, err := vt.vTab.BestIndex(csts, orderBys(info))
@ -418,13 +424,17 @@ func goVBestIndex(pVTab unsafe.Pointer, icp unsafe.Pointer) *C.char {
}
// Get a pointer to constraint_usage struct so we can update in place.
l := info.nConstraint
s := (*[1 << 30]C.struct_sqlite3_index_constraint_usage)(unsafe.Pointer(info.aConstraintUsage))[:l:l]
slice := *(*[]C.struct_sqlite3_index_constraint_usage)(unsafe.Pointer(&reflect.SliceHeader{
Data: uintptr(unsafe.Pointer(info.aConstraintUsage)),
Len: int(info.nConstraint),
Cap: int(info.nConstraint),
}))
index := 1
for i := C.int(0); i < info.nConstraint; i++ {
for i := range slice {
if res.Used[i] {
s[i].argvIndex = C.int(index)
s[i].omit = C.uchar(1)
slice[i].argvIndex = C.int(index)
slice[i].omit = C.uchar(1)
index++
}
}
@ -445,7 +455,7 @@ func goVBestIndex(pVTab unsafe.Pointer, icp unsafe.Pointer) *C.char {
//export goVClose
func goVClose(pCursor unsafe.Pointer) *C.char {
vtc := lookupHandle(uintptr(pCursor)).(*sqliteVTabCursor)
vtc := lookupHandle(pCursor).(*sqliteVTabCursor)
err := vtc.vTabCursor.Close()
if err != nil {
return mPrintf("%s", err.Error())
@ -455,13 +465,13 @@ func goVClose(pCursor unsafe.Pointer) *C.char {
//export goMDestroy
func goMDestroy(pClientData unsafe.Pointer) {
m := lookupHandle(uintptr(pClientData)).(*sqliteModule)
m := lookupHandle(pClientData).(*sqliteModule)
m.module.DestroyModule()
}
//export goVFilter
func goVFilter(pCursor unsafe.Pointer, idxNum C.int, idxName *C.char, argc C.int, argv **C.sqlite3_value) *C.char {
vtc := lookupHandle(uintptr(pCursor)).(*sqliteVTabCursor)
vtc := lookupHandle(pCursor).(*sqliteVTabCursor)
args := (*[(math.MaxInt32 - 1) / unsafe.Sizeof((*C.sqlite3_value)(nil))]*C.sqlite3_value)(unsafe.Pointer(argv))[:argc:argc]
vals := make([]interface{}, 0, argc)
for _, v := range args {
@ -480,7 +490,7 @@ func goVFilter(pCursor unsafe.Pointer, idxNum C.int, idxName *C.char, argc C.int
//export goVNext
func goVNext(pCursor unsafe.Pointer) *C.char {
vtc := lookupHandle(uintptr(pCursor)).(*sqliteVTabCursor)
vtc := lookupHandle(pCursor).(*sqliteVTabCursor)
err := vtc.vTabCursor.Next()
if err != nil {
return mPrintf("%s", err.Error())
@ -490,7 +500,7 @@ func goVNext(pCursor unsafe.Pointer) *C.char {
//export goVEof
func goVEof(pCursor unsafe.Pointer) C.int {
vtc := lookupHandle(uintptr(pCursor)).(*sqliteVTabCursor)
vtc := lookupHandle(pCursor).(*sqliteVTabCursor)
err := vtc.vTabCursor.EOF()
if err {
return 1
@ -500,7 +510,7 @@ func goVEof(pCursor unsafe.Pointer) C.int {
//export goVColumn
func goVColumn(pCursor, cp unsafe.Pointer, col C.int) *C.char {
vtc := lookupHandle(uintptr(pCursor)).(*sqliteVTabCursor)
vtc := lookupHandle(pCursor).(*sqliteVTabCursor)
c := (*SQLiteContext)(cp)
err := vtc.vTabCursor.Column(c, int(col))
if err != nil {
@ -511,7 +521,7 @@ func goVColumn(pCursor, cp unsafe.Pointer, col C.int) *C.char {
//export goVRowid
func goVRowid(pCursor unsafe.Pointer, pRowid *C.sqlite3_int64) *C.char {
vtc := lookupHandle(uintptr(pCursor)).(*sqliteVTabCursor)
vtc := lookupHandle(pCursor).(*sqliteVTabCursor)
rowid, err := vtc.vTabCursor.Rowid()
if err != nil {
return mPrintf("%s", err.Error())
@ -522,7 +532,7 @@ func goVRowid(pCursor unsafe.Pointer, pRowid *C.sqlite3_int64) *C.char {
//export goVUpdate
func goVUpdate(pVTab unsafe.Pointer, argc C.int, argv **C.sqlite3_value, pRowid *C.sqlite3_int64) *C.char {
vt := lookupHandle(uintptr(pVTab)).(*sqliteVTab)
vt := lookupHandle(pVTab).(*sqliteVTab)
var tname string
if n, ok := vt.vTab.(interface {
@ -642,7 +652,7 @@ func (c *SQLiteConn) CreateModule(moduleName string, module Module) error {
mname := C.CString(moduleName)
defer C.free(unsafe.Pointer(mname))
udm := sqliteModule{c, moduleName, module}
rv := C._sqlite3_create_module(c.db, mname, C.uintptr_t(newHandle(c, &udm)))
rv := C._sqlite3_create_module(c.db, mname, C.uintptr_t(uintptr(newHandle(c, &udm))))
if rv != C.SQLITE_OK {
return c.lastError()
}

View File

@ -215,7 +215,6 @@ func addTraceMapping(connHandle uintptr, traceConf TraceConfig) {
traceConf, connHandle, oldEntryCopy.config))
}
traceMap[connHandle] = traceMapEntry{config: traceConf}
fmt.Printf("Added trace config %v: handle 0x%x.\n", traceConf, connHandle)
}
func lookupTraceMapping(connHandle uintptr) (TraceConfig, bool) {
@ -234,7 +233,6 @@ func popTraceMapping(connHandle uintptr) (TraceConfig, bool) {
entryCopy, found := traceMap[connHandle]
if found {
delete(traceMap, connHandle)
fmt.Printf("Pop handle 0x%x: deleted trace config %v.\n", connHandle, entryCopy.config)
}
return entryCopy.config, found
}

View File

@ -325,6 +325,17 @@ struct sqlite3_api_routines {
int (*value_frombind)(sqlite3_value*);
/* Version 3.30.0 and later */
int (*drop_modules)(sqlite3*,const char**);
/* Version 3.31.0 and later */
sqlite3_int64 (*hard_heap_limit64)(sqlite3_int64);
const char *(*uri_key)(const char*,int);
const char *(*filename_database)(const char*);
const char *(*filename_journal)(const char*);
const char *(*filename_wal)(const char*);
/* Version 3.32.0 and later */
char *(*create_filename)(const char*,const char*,const char*,
int,const char**);
void (*free_filename)(char*);
sqlite3_file *(*database_file_object)(const char*);
};
/*
@ -615,10 +626,20 @@ typedef int (*sqlite3_loadext_entry)(
/* Version 3.26.0 and later */
#define sqlite3_normalized_sql sqlite3_api->normalized_sql
/* Version 3.28.0 and later */
#define sqlite3_stmt_isexplain sqlite3_api->isexplain
#define sqlite3_value_frombind sqlite3_api->frombind
#define sqlite3_stmt_isexplain sqlite3_api->stmt_isexplain
#define sqlite3_value_frombind sqlite3_api->value_frombind
/* Version 3.30.0 and later */
#define sqlite3_drop_modules sqlite3_api->drop_modules
/* Version 3.31.0 and later */
#define sqlite3_hard_heap_limit64 sqlite3_api->hard_heap_limit64
#define sqlite3_uri_key sqlite3_api->uri_key
#define sqlite3_filename_database sqlite3_api->filename_database
#define sqlite3_filename_journal sqlite3_api->filename_journal
#define sqlite3_filename_wal sqlite3_api->filename_wal
/* Version 3.32.0 and later */
#define sqlite3_create_filename sqlite3_api->create_filename
#define sqlite3_free_filename sqlite3_api->free_filename
#define sqlite3_database_file_object sqlite3_api->database_file_object
#endif /* !defined(SQLITE_CORE) && !defined(SQLITE_OMIT_LOAD_EXTENSION) */
#if !defined(SQLITE_CORE) && !defined(SQLITE_OMIT_LOAD_EXTENSION)

View File

@ -13,14 +13,25 @@ import (
"errors"
)
func init() {
sql.Register("sqlite3", &SQLiteDriverMock{})
}
type SQLiteDriverMock struct{}
var errorMsg = errors.New("Binary was compiled with 'CGO_ENABLED=0', go-sqlite3 requires cgo to work. This is a stub")
func (SQLiteDriverMock) Open(s string) (driver.Conn, error) {
return nil, errorMsg
func init() {
sql.Register("sqlite3", &SQLiteDriver{})
}
type (
SQLiteDriver struct {
Extensions []string
ConnectHook func(*SQLiteConn) error
}
SQLiteConn struct{}
)
func (SQLiteDriver) Open(s string) (driver.Conn, error) { return nil, errorMsg }
func (c *SQLiteConn) RegisterAggregator(string, interface{}, bool) error { return errorMsg }
func (c *SQLiteConn) RegisterAuthorizer(func(int, string, string, string) int) {}
func (c *SQLiteConn) RegisterCollation(string, func(string, string) int) error { return errorMsg }
func (c *SQLiteConn) RegisterCommitHook(func() int) {}
func (c *SQLiteConn) RegisterFunc(string, interface{}, bool) error { return errorMsg }
func (c *SQLiteConn) RegisterRollbackHook(func()) {}
func (c *SQLiteConn) RegisterUpdateHook(func(int, string, string, int64)) {}

View File

@ -20,33 +20,32 @@ const (
)
var (
columns = "kv.id as theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value"
columns = "kv.id AS theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value"
revSQL = `
SELECT rkv.id
FROM kine rkv
ORDER BY rkv.id
DESC LIMIT 1`
SELECT MAX(rkv.id) AS id
FROM kine AS rkv`
compactRevSQL = `
SELECT crkv.prev_revision
FROM kine crkv
WHERE crkv.name = 'compact_rev_key'
ORDER BY crkv.id DESC LIMIT 1`
SELECT MAX(crkv.prev_revision) AS prev_revision
FROM kine AS crkv
WHERE crkv.name = 'compact_rev_key'`
idOfKey = `
AND mkv.id <= ? AND mkv.id > (
SELECT ikv.id
FROM kine ikv
AND
mkv.id <= ? AND
mkv.id > (
SELECT MAX(ikv.id) AS id
FROM kine AS ikv
WHERE
ikv.name = ? AND
ikv.id <= ?
ORDER BY ikv.id DESC LIMIT 1)`
ikv.id <= ?)`
listSQL = fmt.Sprintf(`SELECT (%s), (%s), %s
FROM kine kv
listSQL = fmt.Sprintf(`
SELECT (%s), (%s), %s
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) as id
FROM kine mkv
SELECT MAX(mkv.id) AS id
FROM kine AS mkv
WHERE
mkv.name LIKE ?
%%s
@ -88,6 +87,7 @@ type Generic struct {
CountSQL string
AfterSQL string
DeleteSQL string
CompactSQL string
UpdateCompactSQL string
InsertSQL string
FillSQL string
@ -138,7 +138,7 @@ func (d *Generic) Migrate(ctx context.Context) {
}
}
func configureConnectionPooling(connPoolConfig ConnectionPoolConfig, db *sql.DB) {
func configureConnectionPooling(connPoolConfig ConnectionPoolConfig, db *sql.DB, driverName string) {
// behavior copied from database/sql - zero means defaultMaxIdleConns; negative means 0
if connPoolConfig.MaxIdle < 0 {
connPoolConfig.MaxIdle = 0
@ -146,7 +146,7 @@ func configureConnectionPooling(connPoolConfig ConnectionPoolConfig, db *sql.DB)
connPoolConfig.MaxIdle = defaultMaxIdleConns
}
logrus.Infof("Configuring DB connection pooling: maxIdleConns=%d, maxOpenConns=%d, connMaxLifetime=%s", connPoolConfig.MaxIdle, connPoolConfig.MaxOpen, connPoolConfig.MaxLifetime)
logrus.Infof("Configuring %s database connection pooling: maxIdleConns=%d, maxOpenConns=%d, connMaxLifetime=%s", driverName, connPoolConfig.MaxIdle, connPoolConfig.MaxOpen, connPoolConfig.MaxLifetime)
db.SetMaxIdleConns(connPoolConfig.MaxIdle)
db.SetMaxOpenConns(connPoolConfig.MaxOpen)
db.SetConnMaxLifetime(connPoolConfig.MaxLifetime)
@ -188,7 +188,7 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig
}
}
configureConnectionPooling(connPoolConfig, db)
configureConnectionPooling(connPoolConfig, db, driverName)
return &Generic{
DB: db,
@ -196,7 +196,7 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig
GetRevisionSQL: q(fmt.Sprintf(`
SELECT
0, 0, %s
FROM kine kv
FROM kine AS kv
WHERE kv.id = ?`, columns), paramCharacter, numbered),
GetCurrentSQL: q(fmt.Sprintf(listSQL, ""), paramCharacter, numbered),
@ -211,15 +211,15 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig
AfterSQL: q(fmt.Sprintf(`
SELECT (%s), (%s), %s
FROM kine kv
FROM kine AS kv
WHERE
kv.name LIKE ? AND
kv.id > ?
ORDER BY kv.id ASC`, revSQL, compactRevSQL, columns), paramCharacter, numbered),
DeleteSQL: q(`
DELETE FROM kine
WHERE id = ?`, paramCharacter, numbered),
DELETE FROM kine AS kv
WHERE kv.id = ?`, paramCharacter, numbered),
UpdateCompactSQL: q(`
UPDATE kine
@ -277,15 +277,26 @@ func (d *Generic) GetCompactRevision(ctx context.Context) (int64, error) {
}
func (d *Generic) SetCompactRevision(ctx context.Context, revision int64) error {
logrus.Tracef("SETCOMPACTREVISION %v", revision)
_, err := d.execute(ctx, d.UpdateCompactSQL, revision)
return err
}
func (d *Generic) Compact(ctx context.Context, revision int64) (int64, error) {
logrus.Tracef("COMPACT %v", revision)
res, err := d.execute(ctx, d.CompactSQL, revision, revision)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
func (d *Generic) GetRevision(ctx context.Context, revision int64) (*sql.Rows, error) {
return d.query(ctx, d.GetRevisionSQL, revision)
}
func (d *Generic) DeleteRevision(ctx context.Context, revision int64) error {
logrus.Tracef("DELETEREVISION %v", revision)
_, err := d.execute(ctx, d.DeleteSQL, revision)
return err
}

View File

@ -0,0 +1,109 @@
package generic
import (
"context"
"database/sql"
"github.com/sirupsen/logrus"
)
type Tx struct {
x *sql.Tx
d *Generic
}
func (d *Generic) BeginTx(ctx context.Context, opts *sql.TxOptions) (*Tx, error) {
logrus.Tracef("TX BEGIN")
x, err := d.DB.BeginTx(ctx, opts)
if err != nil {
return nil, err
}
return &Tx{
x: x,
d: d,
}, nil
}
func (t *Tx) Commit() error {
logrus.Tracef("TX COMMIT")
return t.x.Commit()
}
func (t *Tx) MustCommit() {
if err := t.Commit(); err != nil {
logrus.Fatalf("Transaction commit failed: %v", err)
}
}
func (t *Tx) Rollback() error {
logrus.Tracef("TX ROLLBACK")
return t.x.Rollback()
}
func (t *Tx) MustRollback() {
if err := t.Rollback(); err != nil {
if err != sql.ErrTxDone {
logrus.Fatalf("Transaction rollback failed: %v", err)
}
}
}
func (t *Tx) GetCompactRevision(ctx context.Context) (int64, error) {
var id int64
row := t.queryRow(ctx, compactRevSQL)
err := row.Scan(&id)
if err == sql.ErrNoRows {
return 0, nil
}
return id, err
}
func (t *Tx) SetCompactRevision(ctx context.Context, revision int64) error {
logrus.Tracef("TX SETCOMPACTREVISION %v", revision)
_, err := t.execute(ctx, t.d.UpdateCompactSQL, revision)
return err
}
func (t *Tx) Compact(ctx context.Context, revision int64) (int64, error) {
logrus.Tracef("TX COMPACT %v", revision)
res, err := t.execute(ctx, t.d.CompactSQL, revision, revision)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
func (t *Tx) GetRevision(ctx context.Context, revision int64) (*sql.Rows, error) {
return t.query(ctx, t.d.GetRevisionSQL, revision)
}
func (t *Tx) DeleteRevision(ctx context.Context, revision int64) error {
logrus.Tracef("TX DELETEREVISION %v", revision)
_, err := t.execute(ctx, t.d.DeleteSQL, revision)
return err
}
func (t *Tx) CurrentRevision(ctx context.Context) (int64, error) {
var id int64
row := t.queryRow(ctx, revSQL)
err := row.Scan(&id)
if err == sql.ErrNoRows {
return 0, nil
}
return id, err
}
func (t *Tx) query(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) {
logrus.Tracef("TX QUERY %v : %s", args, Stripped(sql))
return t.x.QueryContext(ctx, sql, args...)
}
func (t *Tx) queryRow(ctx context.Context, sql string, args ...interface{}) *sql.Row {
logrus.Tracef("TX QUERY ROW %v : %s", args, Stripped(sql))
return t.x.QueryRowContext(ctx, sql, args...)
}
func (t *Tx) execute(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error) {
logrus.Tracef("TX EXEC %v : %s", args, Stripped(sql))
return t.x.ExecContext(ctx, sql, args...)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/rancher/kine/pkg/logstructured/sqllog"
"github.com/rancher/kine/pkg/server"
"github.com/rancher/kine/pkg/tls"
"github.com/sirupsen/logrus"
)
const (
@ -20,7 +21,7 @@ const (
var (
schema = []string{
`create table if not exists kine
`CREATE TABLE IF NOT EXISTS kine
(
id INTEGER AUTO_INCREMENT,
name VARCHAR(630),
@ -33,11 +34,13 @@ var (
old_value MEDIUMBLOB,
PRIMARY KEY (id)
);`,
`CREATE INDEX kine_name_index ON kine (name)`,
`CREATE INDEX kine_name_id_index ON kine (name,id)`,
`CREATE INDEX kine_id_deleted_index ON kine (id,deleted)`,
`CREATE INDEX kine_prev_revision_index ON kine (prev_revision)`,
`CREATE UNIQUE INDEX kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
}
nameIdx = "create index kine_name_index on kine (name)"
nameIDIdx = "create index kine_name_id_index on kine (name,id)"
revisionIdx = "create unique index kine_name_prev_revision_uindex on kine (name, prev_revision)"
createDB = "create database if not exists "
createDB = "CREATE DATABASE IF NOT EXISTS "
)
func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
@ -63,7 +66,26 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
if err != nil {
return nil, err
}
dialect.LastInsertID = true
dialect.CompactSQL = `
DELETE kv FROM kine AS kv
INNER JOIN (
SELECT kp.prev_revision AS id
FROM kine AS kp
WHERE
kp.prev_revision != 0 AND
kp.id <= ?
UNION
SELECT kd.id AS id
FROM kine AS kd
WHERE
kd.deleted != 0 AND
kd.id <= ?
) AS ks
ON
kv.id = ks.id AND
kv.name != 'compact_rev_key'`
dialect.TranslateErr = func(err error) error {
if err, ok := err.(*mysql.MySQLError); ok && err.Number == 1062 {
return server.ErrKeyExists
@ -79,24 +101,19 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
}
func setup(db *sql.DB) error {
logrus.Infof("Configuring database table schema and indexes, this may take a moment...")
for _, stmt := range schema {
logrus.Tracef("SETUP EXEC : %v", generic.Stripped(stmt))
_, err := db.Exec(stmt)
if err != nil {
return err
if mysqlError, ok := err.(*mysql.MySQLError); !ok || mysqlError.Number != 1061 {
return err
}
}
}
// check if duplicate indexes
indexes := []string{
nameIdx,
nameIDIdx,
revisionIdx}
for _, idx := range indexes {
err := createIndex(db, idx)
if err != nil {
return err
}
}
logrus.Infof("Database tables and indexes are up to date")
return nil
}
@ -156,13 +173,3 @@ func prepareDSN(dataSourceName string, tlsConfig *cryptotls.Config) (string, err
return parsedDSN, nil
}
func createIndex(db *sql.DB, indexStmt string) error {
_, err := db.Exec(indexStmt)
if err != nil {
if mysqlError, ok := err.(*mysql.MySQLError); !ok || mysqlError.Number != 1061 {
return err
}
}
return nil
}

View File

@ -14,6 +14,7 @@ import (
"github.com/rancher/kine/pkg/logstructured/sqllog"
"github.com/rancher/kine/pkg/server"
"github.com/rancher/kine/pkg/tls"
"github.com/sirupsen/logrus"
)
const (
@ -22,7 +23,7 @@ const (
var (
schema = []string{
`create table if not exists kine
`CREATE TABLE IF NOT EXISTS kine
(
id SERIAL PRIMARY KEY,
name VARCHAR(630),
@ -36,9 +37,11 @@ var (
);`,
`CREATE INDEX IF NOT EXISTS kine_name_index ON kine (name)`,
`CREATE INDEX IF NOT EXISTS kine_name_id_index ON kine (name,id)`,
`CREATE INDEX IF NOT EXISTS kine_id_deleted_index ON kine (id,deleted)`,
`CREATE INDEX IF NOT EXISTS kine_prev_revision_index ON kine (prev_revision)`,
`CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
}
createDB = "create database "
createDB = "CREATE DATABASE "
)
func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoolConfig generic.ConnectionPoolConfig) (server.Backend, error) {
@ -55,6 +58,24 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
if err != nil {
return nil, err
}
dialect.CompactSQL = `
DELETE FROM kine AS kv
USING (
SELECT kp.prev_revision AS id
FROM kine AS kp
WHERE
kp.prev_revision != 0 AND
kp.id <= $1
UNION
SELECT kd.id AS id
FROM kine AS kd
WHERE
kd.deleted != 0 AND
kd.id <= $2
) AS ks
WHERE
kv.id = ks.id AND
kv.name != 'compact_rev_key'`
dialect.TranslateErr = func(err error) error {
if err, ok := err.(*pq.Error); ok && err.Code == "23505" {
return server.ErrKeyExists
@ -71,13 +92,17 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
}
func setup(db *sql.DB) error {
logrus.Infof("Configuring database table schema and indexes, this may take a moment...")
for _, stmt := range schema {
logrus.Tracef("SETUP EXEC : %v", generic.Stripped(stmt))
_, err := db.Exec(stmt)
if err != nil {
return err
}
}
logrus.Infof("Database tables and indexes are up to date")
return nil
}
@ -110,7 +135,9 @@ func createDBIfNotExist(dataSourceName string) error {
return err
}
defer db.Close()
_, err = db.Exec(createDB + dbName + ";")
stmt := createDB + dbName + ";"
logrus.Tracef("SETUP EXEC : %v", generic.Stripped(stmt))
_, err = db.Exec(stmt)
if err != nil {
return err
}

View File

@ -24,7 +24,7 @@ var (
schema = []string{
`CREATE TABLE IF NOT EXISTS kine
(
id INTEGER primary key autoincrement,
id INTEGER PRIMARY KEY AUTOINCREMENT,
name INTEGER,
created INTEGER,
deleted INTEGER,
@ -35,6 +35,9 @@ var (
old_value BLOB
)`,
`CREATE INDEX IF NOT EXISTS kine_name_index ON kine (name)`,
`CREATE INDEX IF NOT EXISTS kine_name_id_index ON kine (name,id)`,
`CREATE INDEX IF NOT EXISTS kine_id_deleted_index ON kine (id,deleted)`,
`CREATE INDEX IF NOT EXISTS kine_prev_revision_index ON kine (prev_revision)`,
`CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
}
)
@ -56,7 +59,25 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string, connPool
if err != nil {
return nil, nil, err
}
dialect.LastInsertID = true
dialect.CompactSQL = `
DELETE FROM kine AS kv
WHERE
kv.name != 'compact_rev_key' AND
kv.id IN (
SELECT kp.prev_revision AS id
FROM kine AS kp
WHERE
kp.prev_revision != 0 AND
kp.id <= ?
UNION
SELECT kd.id AS id
FROM kine AS kd
WHERE
kd.deleted != 0 AND
kd.id <= ?
)`
dialect.TranslateErr = func(err error) error {
if err, ok := err.(sqlite3.Error); ok && err.ExtendedCode == sqlite3.ErrConstraintUnique {
return server.ErrKeyExists
@ -82,21 +103,22 @@ func NewVariant(ctx context.Context, driverName, dataSourceName string, connPool
if err != nil {
return nil, nil, errors.Wrap(err, "setup db")
}
//if err := setup(dialect.DB); err != nil {
// return nil, nil, errors.Wrap(err, "setup db")
//}
dialect.Migrate(context.Background())
return logstructured.New(sqllog.New(dialect)), dialect, nil
}
func setup(db *sql.DB) error {
logrus.Infof("Configuring database table schema and indexes, this may take a moment...")
for _, stmt := range schema {
logrus.Tracef("SETUP EXEC : %v", generic.Stripped(stmt))
_, err := db.Exec(stmt)
if err != nil {
return err
}
}
logrus.Infof("Database tables and indexes are up to date")
return nil
}

View File

@ -33,7 +33,12 @@ func (l *LogStructured) Start(ctx context.Context) error {
if err := l.log.Start(ctx); err != nil {
return err
}
l.Create(ctx, "/registry/health", []byte(`{"health":"true"}`), 0)
// See https://github.com/kubernetes/kubernetes/blob/442a69c3bdf6fe8e525b05887e57d89db1e2f3a5/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go#L97
if _, err := l.Create(ctx, "/registry/health", []byte(`{"health":"true"}`), 0); err != nil {
if err != server.ErrKeyExists {
logrus.Errorf("Failed to create health check key: %v", err)
}
}
go l.ttl(ctx)
return nil
}
@ -304,7 +309,9 @@ func (l *LogStructured) ttl(ctx context.Context) {
case <-time.After(time.Duration(event.KV.Lease) * time.Second):
}
mutex.Lock()
l.Delete(ctx, event.KV.Key, event.KV.ModRevision)
if _, _, _, err := l.Delete(ctx, event.KV.Key, event.KV.ModRevision); err != nil {
logrus.Errorf("failed to delete expired key: %v", err)
}
mutex.Unlock()
}(event)
}

View File

@ -6,11 +6,21 @@ import (
"strings"
"time"
"github.com/pkg/errors"
"github.com/rancher/kine/pkg/broadcaster"
"github.com/rancher/kine/pkg/drivers/generic"
"github.com/rancher/kine/pkg/server"
"github.com/sirupsen/logrus"
)
const (
compactInterval = 5 * time.Minute
compactTimeout = 5 * time.Second
compactMinRetain = 1000
compactBatchSize = 1000
pollBatchSize = 500
)
type SQLLog struct {
d Dialect
broadcaster broadcaster.Broadcaster
@ -37,8 +47,10 @@ type Dialect interface {
DeleteRevision(ctx context.Context, revision int64) error
GetCompactRevision(ctx context.Context) (int64, error)
SetCompactRevision(ctx context.Context, revision int64) error
Compact(ctx context.Context, revision int64) (int64, error)
Fill(ctx context.Context, revision int64) error
IsFill(key string) bool
BeginTx(ctx context.Context, opts *sql.TxOptions) (*generic.Tx, error)
}
func (s *SQLLog) Start(ctx context.Context) (err error) {
@ -47,6 +59,8 @@ func (s *SQLLog) Start(ctx context.Context) (err error) {
}
func (s *SQLLog) compactStart(ctx context.Context) error {
logrus.Tracef("COMPACTSTART")
rows, err := s.d.After(ctx, "compact_rev_key", 0, 0)
if err != nil {
return err
@ -57,6 +71,8 @@ func (s *SQLLog) compactStart(ctx context.Context) error {
return err
}
logrus.Tracef("COMPACTSTART len(events)=%v", len(events))
if len(events) == 0 {
_, err := s.Append(ctx, &server.Event{
Create: true,
@ -70,6 +86,12 @@ func (s *SQLLog) compactStart(ctx context.Context) error {
return nil
}
t, err := s.d.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return err
}
defer t.MustRollback()
// this is to work around a bug in which we ended up with two compact_rev_key rows
maxRev := int64(0)
maxID := int64(0)
@ -78,26 +100,33 @@ func (s *SQLLog) compactStart(ctx context.Context) error {
maxRev = event.PrevKV.ModRevision
maxID = event.KV.ModRevision
}
logrus.Tracef("COMPACTSTART maxRev=%v maxID=%v", maxRev, maxID)
}
for _, event := range events {
logrus.Tracef("COMPACTSTART event.KV.ModRevision=%v maxID=%v", event.KV.ModRevision, maxID)
if event.KV.ModRevision == maxID {
continue
}
if err := s.d.DeleteRevision(ctx, event.KV.ModRevision); err != nil {
if err := t.DeleteRevision(ctx, event.KV.ModRevision); err != nil {
return err
}
}
return nil
return t.Commit()
}
func (s *SQLLog) compact() {
var (
nextEnd int64
)
t := time.NewTicker(5 * time.Minute)
nextEnd, _ = s.d.CurrentRevision(s.ctx)
// compactor periodically compacts historical versions of keys.
// It will compact keys with versions older than given interval, but never within the last 1000 revisions.
// In other words, after compaction, it will only contain key revisions set during last interval.
// Any API call for the older versions of keys will return error.
// Interval is the time interval between each compaction. The first compaction happens after "interval".
// This logic is directly cribbed from k8s.io/apiserver/pkg/storage/etcd3/compact.go
func (s *SQLLog) compactor(interval time.Duration) {
t := time.NewTicker(interval)
compactRev, _ := s.d.GetCompactRevision(s.ctx)
targetCompactRev, _ := s.d.CurrentRevision(s.ctx)
logrus.Tracef("COMPACT starting compactRev=%d targetCompactRev=%d", compactRev, targetCompactRev)
outer:
for {
@ -107,93 +136,106 @@ outer:
case <-t.C:
}
currentRev, err := s.d.CurrentRevision(s.ctx)
if err != nil {
logrus.Errorf("failed to get current revision: %v", err)
continue
}
// Break up the compaction into smaller batches to avoid locking the database with excessively
// long transactions. When things are working normally deletes should proceed quite quickly, but if
// run against a database where compaction has stalled (see rancher/k3s#1311) it may take a long time
// (several hundred ms) just for the database to execute the subquery to select the revisions to delete.
end := nextEnd
nextEnd = currentRev
var (
iterCompactRev int64
compactedRev int64
currentRev int64
err error
)
cursor, err := s.d.GetCompactRevision(s.ctx)
if err != nil {
logrus.Errorf("failed to get compact revision: %v", err)
continue
}
iterCompactRev = compactRev
compactedRev = compactRev
// leave the last 1000
end = end - 1000
for iterCompactRev < targetCompactRev {
// Set move iteration target compactBatchSize revisions forward, or
// just as far as we need to hit the compaction target if that would
// overshoot it.
iterCompactRev += compactBatchSize
if iterCompactRev > targetCompactRev {
iterCompactRev = targetCompactRev
}
savedCursor := cursor
// Purposefully start at the current and redo the current as
// it could have failed before actually compacting
for ; cursor <= end; cursor++ {
rows, err := s.d.GetRevision(s.ctx, cursor)
compactedRev, currentRev, err = s.compact(compactedRev, iterCompactRev)
if err != nil {
logrus.Errorf("failed to get revision %d: %v", cursor, err)
continue outer
}
_, _, events, err := RowsToEvents(rows)
if err != nil {
logrus.Errorf("failed to convert to events: %v", err)
continue outer
}
if len(events) == 0 {
continue
}
event := events[0]
if event.KV.Key == "compact_rev_key" {
// don't compact the compact key
continue
}
setRev := false
if event.PrevKV != nil && event.PrevKV.ModRevision != 0 {
if savedCursor != cursor {
if err := s.d.SetCompactRevision(s.ctx, cursor); err != nil {
logrus.Errorf("failed to record compact revision: %v", err)
continue outer
}
savedCursor = cursor
setRev = true
}
if err := s.d.DeleteRevision(s.ctx, event.PrevKV.ModRevision); err != nil {
logrus.Errorf("failed to delete revision %d: %v", event.PrevKV.ModRevision, err)
continue outer
}
}
if event.Delete {
if !setRev && savedCursor != cursor {
if err := s.d.SetCompactRevision(s.ctx, cursor); err != nil {
logrus.Errorf("failed to record compact revision: %v", err)
continue outer
}
savedCursor = cursor
}
if err := s.d.DeleteRevision(s.ctx, cursor); err != nil {
logrus.Errorf("failed to delete current revision %d: %v", cursor, err)
// ErrCompacted indicates that no further work is necessary - either compactRev changed since the
// last iteration because another client has compacted, or the requested revision has already been compacted.
if err == server.ErrCompacted {
break
} else {
logrus.Errorf("Compact failed: %v", err)
continue outer
}
}
}
if savedCursor != cursor {
if err := s.d.SetCompactRevision(s.ctx, cursor); err != nil {
logrus.Errorf("failed to record compact revision: %v", err)
continue outer
}
}
// Record the final results for the outer loop
compactRev = compactedRev
targetCompactRev = currentRev
}
}
// compact removes deleted or replaced rows from the database. compactRev is the revision that was last compacted to.
// If this changes between compactions, we know that someone else has compacted and we don't need to do it.
// targetCompactRev is the revision that we should try to compact to. Upon success, the function returns the revision
// compacted to, and the revision that we should try to compact to next time (the current revision).
// This logic is directly cribbed from k8s.io/apiserver/pkg/storage/etcd3/compact.go
func (s *SQLLog) compact(compactRev int64, targetCompactRev int64) (int64, int64, error) {
ctx, cancel := context.WithTimeout(s.ctx, compactTimeout)
defer cancel()
t, err := s.d.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return compactRev, targetCompactRev, errors.Wrap(err, "failed to begin transaction")
}
defer t.MustRollback()
currentRev, err := t.CurrentRevision(s.ctx)
if err != nil {
return compactRev, targetCompactRev, errors.Wrap(err, "failed to get current revision")
}
dbCompactRev, err := t.GetCompactRevision(s.ctx)
if err != nil {
return compactRev, targetCompactRev, errors.Wrap(err, "failed to get compact revision")
}
if compactRev != dbCompactRev {
logrus.Tracef("COMPACT compact revision changed since last iteration: %d => %d", compactRev, dbCompactRev)
return dbCompactRev, currentRev, server.ErrCompacted
}
// Ensure that we never compact the most recent 1000 revisions
targetCompactRev = safeCompactRev(targetCompactRev, currentRev)
// Don't bother compacting to a revision that has already been compacted
if targetCompactRev <= compactRev {
logrus.Tracef("COMPACT revision %d has already been compacted", targetCompactRev)
return dbCompactRev, currentRev, server.ErrCompacted
}
logrus.Tracef("COMPACT compactRev=%d targetCompactRev=%d currentRev=%d", compactRev, targetCompactRev, currentRev)
start := time.Now()
deletedRows, err := t.Compact(s.ctx, targetCompactRev)
if err != nil {
return compactRev, targetCompactRev, errors.Wrapf(err, "failed to compact to revision %d", targetCompactRev)
}
if err := t.SetCompactRevision(s.ctx, targetCompactRev); err != nil {
return compactRev, targetCompactRev, errors.Wrap(err, "failed to record compact revision")
}
t.MustCommit()
logrus.Debugf("COMPACT deleted %d rows from %d revisions in %s - compacted to %d/%d", deletedRows, (targetCompactRev - compactRev), time.Since(start), targetCompactRev, currentRev)
return targetCompactRev, currentRev, nil
}
func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) {
return s.d.CurrentRevision(ctx)
}
@ -335,7 +377,7 @@ func (s *SQLLog) startWatch() (chan interface{}, error) {
c := make(chan interface{})
// start compaction and polling at the same time to watch starts
// at the oldest revision, but compaction doesn't create gaps
go s.compact()
go s.compactor(compactInterval)
go s.poll(c, pollStart)
return c, nil
}
@ -366,7 +408,7 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
}
waitForMore = true
rows, err := s.d.After(s.ctx, "%", last, 500)
rows, err := s.d.After(s.ctx, "%", last, pollBatchSize)
if err != nil {
logrus.Errorf("fail to list latest changes: %v", err)
continue
@ -395,6 +437,7 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
// Ensure that we are notifying events in a sequential fashion. For example if we find row 4 before 3
// we don't want to notify row 4 because 3 is essentially dropped forever.
if event.KV.ModRevision != next {
logrus.Tracef("MODREVISION GAP: expected %v, got %v", next, event.KV.ModRevision)
if canSkipRevision(next, skip, skipTime) {
// This situation should never happen, but we have it here as a fallback just for unknown reasons
// we don't want to pause all watches forever
@ -448,7 +491,7 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
}
func canSkipRevision(rev, skip int64, skipTime time.Time) bool {
return rev == skip && time.Now().Sub(skipTime) > time.Second
return rev == skip && time.Since(skipTime) > time.Second
}
func (s *SQLLog) Count(ctx context.Context, prefix string) (int64, int64, error) {
@ -517,3 +560,15 @@ func scan(rows *sql.Rows, rev *int64, compact *int64, event *server.Event) error
*compact = c.Int64
return nil
}
// safeCompactRev ensures that we never compact the most recent 1000 revisions.
func safeCompactRev(targetCompactRev int64, currentRev int64) int64 {
safeRev := currentRev - compactMinRetain
if targetCompactRev < safeRev {
safeRev = targetCompactRev
}
if safeRev < 0 {
safeRev = 0
}
return safeRev
}

View File

@ -4,9 +4,11 @@ import (
"context"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/mvcc/mvccpb"
)
func isCompact(txn *etcdserverpb.TxnRequest) bool {
// See https://github.com/kubernetes/kubernetes/blob/442a69c3bdf6fe8e525b05887e57d89db1e2f3a5/staging/src/k8s.io/apiserver/pkg/storage/etcd3/compact.go#L72
return len(txn.Compare) == 1 &&
txn.Compare[0].Target == etcdserverpb.Compare_VERSION &&
txn.Compare[0].Result == etcdserverpb.Compare_EQUAL &&
@ -18,14 +20,19 @@ func isCompact(txn *etcdserverpb.TxnRequest) bool {
}
func (l *LimitedServer) compact(ctx context.Context) (*etcdserverpb.TxnResponse, error) {
// return comparison failure so that the apiserver does not bother compacting
return &etcdserverpb.TxnResponse{
Header: &etcdserverpb.ResponseHeader{},
Succeeded: true,
Succeeded: false,
Responses: []*etcdserverpb.ResponseOp{
{
Response: &etcdserverpb.ResponseOp_ResponsePut{
ResponsePut: &etcdserverpb.PutResponse{
Response: &etcdserverpb.ResponseOp_ResponseRange{
ResponseRange: &etcdserverpb.RangeResponse{
Header: &etcdserverpb.ResponseHeader{},
Kvs: []*mvccpb.KeyValue{
&mvccpb.KeyValue{},
},
Count: 1,
},
},
},

6
vendor/modules.txt vendored
View File

@ -687,7 +687,7 @@ github.com/konsorten/go-windows-terminal-sequences
github.com/kubernetes-sigs/cri-tools/cmd/crictl
github.com/kubernetes-sigs/cri-tools/pkg/common
github.com/kubernetes-sigs/cri-tools/pkg/version
# github.com/lib/pq v1.1.1
# github.com/lib/pq v1.8.0
## explicit
github.com/lib/pq
github.com/lib/pq/oid
@ -700,7 +700,7 @@ github.com/lithammer/dedent
github.com/mailru/easyjson/buffer
github.com/mailru/easyjson/jlexer
github.com/mailru/easyjson/jwriter
# github.com/mattn/go-sqlite3 v1.13.0
# github.com/mattn/go-sqlite3 v1.14.4
## explicit
github.com/mattn/go-sqlite3
# github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369
@ -836,7 +836,7 @@ github.com/rancher/helm-controller/pkg/generated/informers/externalversions/helm
github.com/rancher/helm-controller/pkg/generated/informers/externalversions/internalinterfaces
github.com/rancher/helm-controller/pkg/generated/listers/helm.cattle.io/v1
github.com/rancher/helm-controller/pkg/helm
# github.com/rancher/kine v0.4.1
# github.com/rancher/kine v0.5.0
## explicit
github.com/rancher/kine/pkg/broadcaster
github.com/rancher/kine/pkg/client