diff --git a/Dockerfile.dapper b/Dockerfile.dapper index c4a1e59f63..1f46fe6620 100644 --- a/Dockerfile.dapper +++ b/Dockerfile.dapper @@ -29,7 +29,7 @@ ENV SELINUX $SELINUX ARG DQLITE=true ENV DQLITE $DQLITE -COPY --from=rancher/dqlite-build:v1.3.1-r1 /dist/artifacts /usr/src/ +COPY --from=rancher/dqlite-build:v1.4.1-r1 /dist/artifacts /usr/src/ RUN if [ "$DQLITE" = true ]; then \ tar xzf /usr/src/dqlite.tgz -C / && \ apk add --allow-untrusted /usr/local/packages/*.apk \ diff --git a/go.mod b/go.mod index 1352343c0f..19783a7ecf 100644 --- a/go.mod +++ b/go.mod @@ -67,7 +67,7 @@ require ( github.com/bhendo/go-powershell v0.0.0-20190719160123-219e7fb4e41e // indirect github.com/bronze1man/goStrongswanVici v0.0.0-20190828090544-27d02f80ba40 // indirect github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect - github.com/canonical/go-dqlite v1.3.0 + github.com/canonical/go-dqlite v1.5.1 github.com/containerd/cgroups v0.0.0-00010101000000-000000000000 // indirect github.com/containerd/containerd v1.3.0-beta.2.0.20190828155532-0293cbd26c69 github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 // indirect @@ -102,7 +102,7 @@ require ( github.com/rakelkar/gonetsh v0.0.0-20190719023240-501daadcadf8 // indirect github.com/rancher/dynamiclistener v0.2.0 github.com/rancher/helm-controller v0.5.0 - github.com/rancher/kine v0.3.6-0.20200422224205-0a0f5b924129 + github.com/rancher/kine v0.4.0 github.com/rancher/remotedialer v0.2.0 github.com/rancher/wrangler v0.6.1 github.com/rancher/wrangler-api v0.6.0 diff --git a/go.sum b/go.sum index 274f6d8d1d..7effcea08c 100644 --- a/go.sum +++ b/go.sum @@ -99,6 +99,8 @@ github.com/caddyserver/caddy v1.0.3/go.mod h1:G+ouvOY32gENkJC+jhgl62TyhvqEsFaDiZ github.com/canonical/go-dqlite v1.2.0/go.mod h1:wp00vfMvPYgNCyxcPdHB5XExmDoCGoPUGymloAQT17Y= github.com/canonical/go-dqlite v1.3.0 h1:c+7eGZfh0K7yCmGrBkNRGZdY8R8+2jSSkz6Zr3YCjJE= github.com/canonical/go-dqlite v1.3.0/go.mod h1:wp00vfMvPYgNCyxcPdHB5XExmDoCGoPUGymloAQT17Y= +github.com/canonical/go-dqlite v1.5.1 h1:1YjtIrFsC1A3XlgsX38ARAiKhvkZS63PqsEd8z3T4yU= +github.com/canonical/go-dqlite v1.5.1/go.mod h1:wp00vfMvPYgNCyxcPdHB5XExmDoCGoPUGymloAQT17Y= github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/prettybench v0.0.0-20150116022406-03b8cfe5406c/go.mod h1:Xe6ZsFhtM8HrDku0pxJ3/Lr51rwykrzgFwpmTzleatY= @@ -650,6 +652,8 @@ github.com/rancher/kine v0.3.5 h1:Tm4eOtejpnzs1WFBrXj76lCLvX9czLlTkgqUk9luCQk= github.com/rancher/kine v0.3.5/go.mod h1:xEMl0tLCva9/9me7mXJ3m9Vo6yqHgC4OU3NiK4CPrGQ= github.com/rancher/kine v0.3.6-0.20200422224205-0a0f5b924129 h1:4HYvCG8+pfkBlpYLv/5ZOdxagg3jTszMOqMjamMQ0hA= github.com/rancher/kine v0.3.6-0.20200422224205-0a0f5b924129/go.mod h1:xEMl0tLCva9/9me7mXJ3m9Vo6yqHgC4OU3NiK4CPrGQ= +github.com/rancher/kine v0.4.0 h1:1IhWy3TzjExG8xnj46eyUEWdzqNAD1WrgL4eEBKm6Uc= +github.com/rancher/kine v0.4.0/go.mod h1:IImtCJ68AIkE+VY/kUI0NkyJL5q5WzO8QvMsSXqbrpA= github.com/rancher/kubernetes v1.18.2-k3s.1 h1:LhWNObWF7dL/+T57LkYpuRKtsCBpt0P5G6dRVFG+Ncs= github.com/rancher/kubernetes v1.18.2-k3s.1/go.mod h1:z8xjOOO1Ljz+TaHpOxVGC7cxtF32TesIamoQ+BZrVS0= github.com/rancher/kubernetes/staging/src/k8s.io/api v1.18.2-k3s.1 h1:tYDY9g8+xLwUcsG9T6Xg7cBkO/vgU6yv7cQKqUN6NDE= diff --git a/vendor/github.com/canonical/go-dqlite/.gitignore b/vendor/github.com/canonical/go-dqlite/.gitignore index d3da31a83a..19e28f970a 100644 --- a/vendor/github.com/canonical/go-dqlite/.gitignore +++ b/vendor/github.com/canonical/go-dqlite/.gitignore @@ -1,4 +1,6 @@ .sqlite +cmd/dqlite/dqlite +cmd/dqlite-demo/dqlite-demo demo profile.coverprofile overalls.coverprofile diff --git a/vendor/github.com/canonical/go-dqlite/README.md b/vendor/github.com/canonical/go-dqlite/README.md index c8f8c20092..4f9d48e845 100644 --- a/vendor/github.com/canonical/go-dqlite/README.md +++ b/vendor/github.com/canonical/go-dqlite/README.md @@ -10,9 +10,32 @@ Usage The best way to understand how to use the ```go-dqlite``` package is probably by looking at the source code of the [demo -program](https://github.com/canonical/go-dqlite/tree/master/cmd/dqlite-demo) and +program](https://github.com/canonical/go-dqlite/tree/master/cmd/dqlite-demo/main.go) and use it as example. +In general your application will use code such as: + + +```go +dir := "/path/to/data/directory" +address := "1.2.3.4:666" // Unique node address +cluster := []string{...} // Optional list of existing nodes, when starting a new node +app, err := app.New(dir, app.WithAddress(address), app.WithCluster(cluster)) +if err != nil { + // ... +} + +db, err := app.Open(context.Background(), "my-database") +if err != nil { + // ... +} + +// db is a *sql.DB object +if _, err := db.Exec("CREATE TABLE my_table (n INT)"); err != nil + // ... +} +``` + Build ----- @@ -50,42 +73,61 @@ go install -tags libsqlite3 ./cmd/dqlite-demo from the top-level directory of this repository. -Once the ```dqlite-demo``` binary is installed, start three nodes of the demo -application, respectively with IDs ```1```, ```2,``` and ```3```: +This builds a demo dqlite application, which exposes a simple key/value store +over an HTTP API. + +Once the `dqlite-demo` binary is installed (normally under `~/go/bin`), +start three nodes of the demo application: ```bash -dqlite-demo start 1 & -dqlite-demo start 2 & -dqlite-demo start 3 & +dqlite-demo --api 127.0.0.1:8001 --db 127.0.0.1:9001 & +dqlite-demo --api 127.0.0.1:8002 --db 127.0.0.1:9002 --join 127.0.0.1:9001 & +dqlite-demo --api 127.0.0.1:8003 --db 127.0.0.1:9003 --join 127.0.0.1:9001 & ``` -The node with ID ```1``` automatically becomes the leader of a single node -cluster, while the nodes with IDs ```2``` and ```3``` are waiting to be notified -what cluster they belong to. Let's make nodes ```2``` and ```3``` join the -cluster: +The `--api` flag tells the demo program where to expose its HTTP API. + +The `--db` flag tells the demo program to use the given address for internal +database replication. + +The `--join` flag is optional and should be used only for additional nodes after +the first one. It informs them about the existing cluster, so they can +automatically join it. + +Now we can start using the cluster. Let's insert a key pair: ```bash -dqlite-demo add 2 -dqlite-demo add 3 -``` - -Now we can start using the cluster. The demo application is just a simple -key/value store that stores data in a SQLite table. Let's insert a key pair: - -```bash -dqlite-demo update my-key my-value +curl -X PUT -d my-key http://127.0.0.1:8001/my-value ``` and then retrive it from the database: ```bash -dqlite-demo query my-key +curl http://127.0.0.1:8001/my-value ``` -Currently node ```1``` is the leader. If we stop it and then try to query the -key again we'll notice that the ```query``` command hangs for a bit waiting for -the failover to occur and for another node to step up as leader: +Currently the first node is the leader. If we stop it and then try to query the +key again curl will fail, but we can simply change the endpoint to another node +and things will work since an automatic failover has taken place: + +```bash +kill -TERM %1; curl http://127.0.0.1:8002/my-value +``` + +Shell +------ + +A basic SQLite-like dqlite shell can be built with: ``` -kill -TERM %1; sleep 0.1; dqlite-demo query my-key; dqlite-demo cluster +go install -tags libsqlite3 ./cmd/dqlite ``` + +You can test it with the `dqlite-demo` with: + +``` +dqlite -s 127.0.0.1:9001 +``` + +It supports normal SQL queries plus the special `.cluster` and `.leader` +commands to inspect the cluster members and the current leader. diff --git a/vendor/github.com/canonical/go-dqlite/client/client.go b/vendor/github.com/canonical/go-dqlite/client/client.go index 0460bbc8ad..0df51efe5d 100644 --- a/vendor/github.com/canonical/go-dqlite/client/client.go +++ b/vendor/github.com/canonical/go-dqlite/client/client.go @@ -2,10 +2,6 @@ package client import ( "context" - "encoding/binary" - "io" - "net" - "strings" "github.com/canonical/go-dqlite/internal/protocol" "github.com/pkg/errors" @@ -57,22 +53,13 @@ func New(ctx context.Context, address string, options ...Option) (*Client, error return nil, errors.Wrap(err, "failed to establish network connection") } - // Latest protocol version. - proto := make([]byte, 8) - binary.LittleEndian.PutUint64(proto, protocol.VersionOne) - - // Perform the protocol handshake. - n, err := conn.Write(proto) + protocol, err := protocol.Handshake(ctx, conn, protocol.VersionOne) if err != nil { conn.Close() - return nil, errors.Wrap(err, "failed to send handshake") - } - if n != 8 { - conn.Close() - return nil, errors.Wrap(io.ErrShortWrite, "failed to send handshake") + return nil, err } - client := &Client{protocol: protocol.NewProtocol(protocol.VersionOne, conn)} + client := &Client{protocol: protocol} return client, nil } @@ -274,10 +261,3 @@ func defaultOptions() *options { LogFunc: DefaultLogFunc, } } - -func DefaultDialFunc(ctx context.Context, address string) (net.Conn, error) { - if strings.HasPrefix(address, "@") { - return protocol.UnixDial(ctx, address) - } - return protocol.TCPDial(ctx, address) -} diff --git a/vendor/github.com/canonical/go-dqlite/client/dial.go b/vendor/github.com/canonical/go-dqlite/client/dial.go new file mode 100644 index 0000000000..d22192cf7b --- /dev/null +++ b/vendor/github.com/canonical/go-dqlite/client/dial.go @@ -0,0 +1,33 @@ +package client + +import ( + "context" + "crypto/tls" + "net" + "strings" + + "github.com/canonical/go-dqlite/internal/protocol" +) + +// DefaultDialFunc is the default dial function, which can handle plain TCP and +// Unix socket endpoints. You can customize it with WithDialFunc() +func DefaultDialFunc(ctx context.Context, address string) (net.Conn, error) { + if strings.HasPrefix(address, "@") { + return protocol.UnixDial(ctx, address) + } + return protocol.TCPDial(ctx, address) +} + +// DialFuncWithTLS returns a dial function that uses TLS encryption. +// +// The given dial function will be used to establish the network connection, +// and the given TLS config will be used for encryption. +func DialFuncWithTLS(dial DialFunc, config *tls.Config) DialFunc { + return func(ctx context.Context, addr string) (net.Conn, error) { + conn, err := dial(ctx, addr) + if err != nil { + return nil, err + } + return tls.Client(conn, config), nil + } +} diff --git a/vendor/github.com/canonical/go-dqlite/client/leader.go b/vendor/github.com/canonical/go-dqlite/client/leader.go index 4cc305fa76..3cc387f14e 100644 --- a/vendor/github.com/canonical/go-dqlite/client/leader.go +++ b/vendor/github.com/canonical/go-dqlite/client/leader.go @@ -2,10 +2,7 @@ package client import ( "context" - "time" - "github.com/Rican7/retry/backoff" - "github.com/Rican7/retry/strategy" "github.com/canonical/go-dqlite/internal/protocol" ) @@ -18,10 +15,7 @@ func FindLeader(ctx context.Context, store NodeStore, options ...Option) (*Clien } config := protocol.Config{ - Dial: o.DialFunc, - AttemptTimeout: time.Second, - RetryStrategies: []strategy.Strategy{ - strategy.Backoff(backoff.BinaryExponential(time.Millisecond))}, + Dial: o.DialFunc, } connector := protocol.NewConnector(0, store, config, o.LogFunc) protocol, err := connector.Connect(ctx) diff --git a/vendor/github.com/canonical/go-dqlite/client/log.go b/vendor/github.com/canonical/go-dqlite/client/log.go index 0d3ead7bc3..9fd99c605a 100644 --- a/vendor/github.com/canonical/go-dqlite/client/log.go +++ b/vendor/github.com/canonical/go-dqlite/client/log.go @@ -1,10 +1,6 @@ package client import ( - "fmt" - "log" - "os" - "github.com/canonical/go-dqlite/internal/logging" ) @@ -22,11 +18,5 @@ const ( LogError = logging.Error ) -var ( - logger = log.New(os.Stdout, "", log.LstdFlags|log.Lmicroseconds) -) - -// DefaultLogFunc emits messages using the stdlib's logger. -func DefaultLogFunc(l LogLevel, format string, a ...interface{}) { - logger.Output(2, fmt.Sprintf("[%s]: %s", l.String(), fmt.Sprintf(format, a...))) -} +// DefaultLogFunc doesn't emit any message. +func DefaultLogFunc(l LogLevel, format string, a ...interface{}) {} diff --git a/vendor/github.com/canonical/go-dqlite/client/store.go b/vendor/github.com/canonical/go-dqlite/client/store.go index 94c5f470b2..c71ec2dcb0 100644 --- a/vendor/github.com/canonical/go-dqlite/client/store.go +++ b/vendor/github.com/canonical/go-dqlite/client/store.go @@ -4,7 +4,11 @@ import ( "context" "database/sql" "fmt" + "io/ioutil" + "os" + "sync" + "github.com/ghodss/yaml" "github.com/pkg/errors" "github.com/canonical/go-dqlite/internal/protocol" @@ -163,3 +167,65 @@ func (d *DatabaseNodeStore) Set(ctx context.Context, servers []NodeInfo) error { return nil } + +// Persists a list addresses of dqlite nodes in a YAML file. +type YamlNodeStore struct { + path string + servers []NodeInfo + mu sync.RWMutex +} + +// NewYamlNodeStore creates a new YamlNodeStore backed by the given YAML file. +func NewYamlNodeStore(path string) (*YamlNodeStore, error) { + servers := []NodeInfo{} + + _, err := os.Stat(path) + if err != nil { + if !os.IsNotExist(err) { + return nil, err + } + } else { + data, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + + if err := yaml.Unmarshal(data, &servers); err != nil { + return nil, err + } + } + + store := &YamlNodeStore{ + path: path, + servers: servers, + } + + return store, nil +} + +// Get the current servers. +func (s *YamlNodeStore) Get(ctx context.Context) ([]NodeInfo, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.servers, nil +} + +// Set the servers addresses. +func (s *YamlNodeStore) Set(ctx context.Context, servers []NodeInfo) error { + s.mu.Lock() + defer s.mu.Unlock() + + data, err := yaml.Marshal(servers) + if err != nil { + return err + } + + if err := ioutil.WriteFile(s.path, data, 0600); err != nil { + return err + } + + s.servers = servers + + return nil +} diff --git a/vendor/github.com/canonical/go-dqlite/config.go b/vendor/github.com/canonical/go-dqlite/config.go index 9b7abd87b3..6436160579 100644 --- a/vendor/github.com/canonical/go-dqlite/config.go +++ b/vendor/github.com/canonical/go-dqlite/config.go @@ -5,6 +5,7 @@ import ( "os" "github.com/canonical/go-dqlite/internal/bindings" + "github.com/canonical/go-dqlite/internal/protocol" "github.com/pkg/errors" ) @@ -26,7 +27,7 @@ import ( // from setting Single-thread mode at all. func ConfigMultiThread() error { if err := bindings.ConfigMultiThread(); err != nil { - if err, ok := err.(bindings.Error); ok && err.Code == 21 /* SQLITE_MISUSE */ { + if err, ok := err.(protocol.Error); ok && err.Code == 21 /* SQLITE_MISUSE */ { return fmt.Errorf("SQLite is already initialized") } return errors.Wrap(err, "unknown error") diff --git a/vendor/github.com/canonical/go-dqlite/driver/driver.go b/vendor/github.com/canonical/go-dqlite/driver/driver.go index 874366aba6..68c19fcfd6 100644 --- a/vendor/github.com/canonical/go-dqlite/driver/driver.go +++ b/vendor/github.com/canonical/go-dqlite/driver/driver.go @@ -24,12 +24,9 @@ import ( "syscall" "time" - "github.com/Rican7/retry/backoff" - "github.com/Rican7/retry/strategy" "github.com/pkg/errors" "github.com/canonical/go-dqlite/client" - "github.com/canonical/go-dqlite/internal/bindings" "github.com/canonical/go-dqlite/internal/protocol" ) @@ -44,7 +41,7 @@ type Driver struct { } // Error is returned in case of database errors. -type Error = bindings.Error +type Error = protocol.Error // Error codes. Values here mostly overlap with native SQLite codes. const ( @@ -87,6 +84,9 @@ func WithDialFunc(dial DialFunc) Option { // WithConnectionTimeout sets the connection timeout. // // If not used, the default is 5 seconds. +// +// DEPRECATED: Connection cancellation is supported via the driver.Connector +// interface, which is used internally by the stdlib sql package. func WithConnectionTimeout(timeout time.Duration) Option { return func(options *options) { options.ConnectionTimeout = timeout @@ -96,7 +96,7 @@ func WithConnectionTimeout(timeout time.Duration) Option { // WithConnectionBackoffFactor sets the exponential backoff factor for retrying // failed connection attempts. // -// If not used, the default is 50 milliseconds. +// If not used, the default is 100 milliseconds. func WithConnectionBackoffFactor(factor time.Duration) Option { return func(options *options) { options.ConnectionBackoffFactor = factor @@ -113,7 +113,28 @@ func WithConnectionBackoffCap(cap time.Duration) Option { } } +// WithAttemptTimeout sets the timeout for each individual connection attempt . +// +// If not used, the default is 60 seconds. +func WithAttemptTimeout(timeout time.Duration) Option { + return func(options *options) { + options.AttemptTimeout = timeout + } +} + +// WithRetryLimit sets the maximum number of connection retries. +// +// If not used, the default is 0 (unlimited retries) +func WithRetryLimit(limit uint) Option { + return func(options *options) { + options.RetryLimit = limit + } +} + // WithContext sets a global cancellation context. +// +// DEPRECATED: This API is no a no-op. Users should explicitly pass a context +// if they wish to cancel their requests. func WithContext(context context.Context) Option { return func(options *options) { options.Context = context @@ -123,7 +144,8 @@ func WithContext(context context.Context) Option { // WithContextTimeout sets the default client context timeout when no context // deadline is provided. // -// If not used, the default is 5 seconds. +// DEPRECATED: This API is no a no-op. Users should explicitly pass a context +// if they wish to cancel their requests. func WithContextTimeout(timeout time.Duration) Option { return func(options *options) { options.ContextTimeout = timeout @@ -148,13 +170,10 @@ func New(store client.NodeStore, options ...Option) (*Driver, error) { } driver.clientConfig.Dial = o.Dial - driver.clientConfig.AttemptTimeout = 5 * time.Second - driver.clientConfig.RetryStrategies = []strategy.Strategy{ - driverConnectionRetryStrategy( - o.ConnectionBackoffFactor, - o.ConnectionBackoffCap, - ), - } + driver.clientConfig.AttemptTimeout = o.AttemptTimeout + driver.clientConfig.BackoffFactor = o.ConnectionBackoffFactor + driver.clientConfig.BackoffCap = o.ConnectionBackoffCap + driver.clientConfig.RetryLimit = o.RetryLimit return driver, nil } @@ -163,63 +182,48 @@ func New(store client.NodeStore, options ...Option) (*Driver, error) { type options struct { Log client.LogFunc Dial protocol.DialFunc + AttemptTimeout time.Duration ConnectionTimeout time.Duration ContextTimeout time.Duration ConnectionBackoffFactor time.Duration ConnectionBackoffCap time.Duration + RetryLimit uint Context context.Context } // Create a options object with sane defaults. func defaultOptions() *options { return &options{ - Log: client.DefaultLogFunc, - Dial: client.DefaultDialFunc, - ConnectionTimeout: 15 * time.Second, - ContextTimeout: 2 * time.Second, - ConnectionBackoffFactor: 50 * time.Millisecond, - ConnectionBackoffCap: time.Second, - Context: context.Background(), + Log: client.DefaultLogFunc, + Dial: client.DefaultDialFunc, } } -// Return a retry strategy with jittered exponential backoff, capped at the -// given amount of time. -func driverConnectionRetryStrategy(factor, cap time.Duration) strategy.Strategy { - backoff := backoff.BinaryExponential(factor) - - return func(attempt uint) bool { - if attempt > 0 { - duration := backoff(attempt) - if duration > cap { - duration = cap - } - time.Sleep(duration) - } - - return true - } +// A Connector represents a driver in a fixed configuration and can create any +// number of equivalent Conns for use by multiple goroutines. +type Connector struct { + uri string + driver *Driver } -// Open establishes a new connection to a SQLite database on the dqlite server. -// -// The given name must be a pure file name without any directory segment, -// dqlite will connect to a database with that name in its data directory. -// -// Query parameters are always valid except for "mode=memory". -// -// If this node is not the leader, or the leader is unknown an ErrNotLeader -// error is returned. -func (d *Driver) Open(uri string) (driver.Conn, error) { - ctx, cancel := context.WithTimeout(d.context, d.connectionTimeout) - defer cancel() +// Connect returns a connection to the database. +func (c *Connector) Connect(ctx context.Context) (driver.Conn, error) { + if c.driver.context != nil { + ctx = c.driver.context + } + + if c.driver.connectionTimeout != 0 { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, c.driver.connectionTimeout) + defer cancel() + } // TODO: generate a client ID. - connector := protocol.NewConnector(0, d.store, d.clientConfig, d.log) + connector := protocol.NewConnector(0, c.driver.store, c.driver.clientConfig, c.driver.log) conn := &Conn{ - log: d.log, - contextTimeout: d.contextTimeout, + log: c.driver.log, + contextTimeout: c.driver.contextTimeout, } var err error @@ -227,15 +231,11 @@ func (d *Driver) Open(uri string) (driver.Conn, error) { if err != nil { return nil, errors.Wrap(err, "failed to create dqlite connection") } - conn.protocol.SetContextTimeout(d.contextTimeout) conn.request.Init(4096) conn.response.Init(4096) - defer conn.request.Reset() - defer conn.response.Reset() - - protocol.EncodeOpen(&conn.request, uri, 0, "volatile") + protocol.EncodeOpen(&conn.request, c.uri, 0, "volatile") if err := conn.protocol.Call(ctx, &conn.request, &conn.response); err != nil { conn.protocol.Close() @@ -251,11 +251,45 @@ func (d *Driver) Open(uri string) (driver.Conn, error) { return conn, nil } +// Driver returns the underlying Driver of the Connector, +func (c *Connector) Driver() driver.Driver { + return c.driver +} + +// OpenConnector must parse the name in the same format that Driver.Open +// parses the name parameter. +func (d *Driver) OpenConnector(name string) (driver.Connector, error) { + connector := &Connector{ + uri: name, + driver: d, + } + return connector, nil +} + +// Open establishes a new connection to a SQLite database on the dqlite server. +// +// The given name must be a pure file name without any directory segment, +// dqlite will connect to a database with that name in its data directory. +// +// Query parameters are always valid except for "mode=memory". +// +// If this node is not the leader, or the leader is unknown an ErrNotLeader +// error is returned. +func (d *Driver) Open(uri string) (driver.Conn, error) { + connector, err := d.OpenConnector(uri) + if err != nil { + return nil, err + } + + return connector.Connect(context.Background()) +} + // SetContextTimeout sets the default client timeout when no context deadline // is provided. -func (d *Driver) SetContextTimeout(timeout time.Duration) { - d.contextTimeout = timeout -} +// +// DEPRECATED: This API is no a no-op. Users should explicitly pass a context +// if they wish to cancel their requests. +func (d *Driver) SetContextTimeout(timeout time.Duration) {} // ErrNoAvailableLeader is returned as root cause of Open() if there's no // leader available in the cluster. @@ -275,9 +309,6 @@ type Conn struct { // context is for the preparation of the statement, it must not store the // context within the statement itself. func (c *Conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) { - defer c.request.Reset() - defer c.response.Reset() - stmt := &Stmt{ protocol: c.protocol, request: &c.request, @@ -306,9 +337,6 @@ func (c *Conn) Prepare(query string) (driver.Stmt, error) { // ExecContext is an optional interface that may be implemented by a Conn. func (c *Conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { - defer c.request.Reset() - defer c.response.Reset() - protocol.EncodeExecSQL(&c.request, uint64(c.id), query, args) if err := c.protocol.Call(ctx, &c.request, &c.response); err != nil { @@ -330,18 +358,14 @@ func (c *Conn) Query(query string, args []driver.Value) (driver.Rows, error) { // QueryContext is an optional interface that may be implemented by a Conn. func (c *Conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) { - defer c.request.Reset() - protocol.EncodeQuerySQL(&c.request, uint64(c.id), query, args) if err := c.protocol.Call(ctx, &c.request, &c.response); err != nil { - c.response.Reset() return nil, driverError(err) } rows, err := protocol.DecodeRows(&c.response) if err != nil { - c.response.Reset() return nil, driverError(err) } @@ -391,7 +415,15 @@ func (c *Conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, e // // Deprecated: Drivers should implement ConnBeginTx instead (or additionally). func (c *Conn) Begin() (driver.Tx, error) { - return c.BeginTx(context.Background(), driver.TxOptions{}) + ctx := context.Background() + + if c.contextTimeout > 0 { + var cancel func() + ctx, cancel = context.WithTimeout(context.Background(), c.contextTimeout) + defer cancel() + } + + return c.BeginTx(ctx, driver.TxOptions{}) } // Tx is a transaction. @@ -401,8 +433,7 @@ type Tx struct { // Commit the transaction. func (tx *Tx) Commit() error { - ctx, cancel := context.WithTimeout(context.Background(), tx.conn.contextTimeout) - defer cancel() + ctx := context.Background() if _, err := tx.conn.ExecContext(ctx, "COMMIT", nil); err != nil { return driverError(err) @@ -413,8 +444,7 @@ func (tx *Tx) Commit() error { // Rollback the transaction. func (tx *Tx) Rollback() error { - ctx, cancel := context.WithTimeout(context.Background(), tx.conn.contextTimeout) - defer cancel() + ctx := context.Background() if _, err := tx.conn.ExecContext(ctx, "ROLLBACK", nil); err != nil { return driverError(err) @@ -436,9 +466,6 @@ type Stmt struct { // Close closes the statement. func (s *Stmt) Close() error { - defer s.request.Reset() - defer s.response.Reset() - protocol.EncodeFinalize(s.request, s.db, s.id) ctx := context.Background() @@ -464,9 +491,6 @@ func (s *Stmt) NumInput() int { // // ExecContext must honor the context timeout and return when it is canceled. func (s *Stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) { - defer s.request.Reset() - defer s.response.Reset() - protocol.EncodeExec(s.request, s.db, s.id, args) if err := s.protocol.Call(ctx, s.request, s.response); err != nil { @@ -491,22 +515,14 @@ func (s *Stmt) Exec(args []driver.Value) (driver.Result, error) { // // QueryContext must honor the context timeout and return when it is canceled. func (s *Stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) { - defer s.request.Reset() - - // FIXME: this shouldn't be needed but we have hit a few panics - // probably due to the response object not being fully reset. - s.response.Reset() - protocol.EncodeQuery(s.request, s.db, s.id, args) if err := s.protocol.Call(ctx, s.request, s.response); err != nil { - s.response.Reset() return nil, driverError(err) } rows, err := protocol.DecodeRows(s.response) if err != nil { - s.response.Reset() return nil, driverError(err) } diff --git a/vendor/github.com/canonical/go-dqlite/internal/bindings/errors.go b/vendor/github.com/canonical/go-dqlite/internal/bindings/errors.go deleted file mode 100644 index 1fe3caacb6..0000000000 --- a/vendor/github.com/canonical/go-dqlite/internal/bindings/errors.go +++ /dev/null @@ -1,19 +0,0 @@ -package bindings - -/* -#include -*/ -import "C" - -// Error holds information about a SQLite error. -type Error struct { - Code int - Message string -} - -func (e Error) Error() string { - if e.Message != "" { - return e.Message - } - return C.GoString(C.sqlite3_errstr(C.int(e.Code))) -} diff --git a/vendor/github.com/canonical/go-dqlite/internal/bindings/server.go b/vendor/github.com/canonical/go-dqlite/internal/bindings/server.go index 36705c48ed..bdbebfc1dc 100644 --- a/vendor/github.com/canonical/go-dqlite/internal/bindings/server.go +++ b/vendor/github.com/canonical/go-dqlite/internal/bindings/server.go @@ -90,14 +90,14 @@ func init() { func ConfigSingleThread() error { if rc := C.sqlite3ConfigSingleThread(); rc != 0 { - return Error{Code: int(rc)} + return protocol.Error{Message: C.GoString(C.sqlite3_errstr(rc)), Code: int(rc)} } return nil } func ConfigMultiThread() error { if rc := C.sqlite3ConfigMultiThread(); rc != 0 { - return Error{Code: int(rc)} + return protocol.Error{Message: C.GoString(C.sqlite3_errstr(rc)), Code: int(rc)} } return nil } @@ -197,6 +197,14 @@ func (s *Node) Recover(cluster []protocol.NodeInfo) error { return nil } +// GenerateID generates a unique ID for a server. +func GenerateID(address string) uint64 { + caddress := C.CString(address) + defer C.free(unsafe.Pointer(caddress)) + id := C.dqlite_generate_node_id(caddress) + return uint64(id) +} + // Extract the underlying socket from a connection. func connToSocket(conn net.Conn) (C.int, error) { file, err := conn.(fileConn).File() diff --git a/vendor/github.com/canonical/go-dqlite/internal/protocol/config.go b/vendor/github.com/canonical/go-dqlite/internal/protocol/config.go index 9b89d57f63..de5272df05 100644 --- a/vendor/github.com/canonical/go-dqlite/internal/protocol/config.go +++ b/vendor/github.com/canonical/go-dqlite/internal/protocol/config.go @@ -2,13 +2,14 @@ package protocol import ( "time" - - "github.com/Rican7/retry/strategy" ) // Config holds various configuration parameters for a dqlite client. type Config struct { - Dial DialFunc // Network dialer. - AttemptTimeout time.Duration // Timeout for each individual Dial attempt. - RetryStrategies []strategy.Strategy // Strategies used for retrying to connect to a leader. + Dial DialFunc // Network dialer. + DialTimeout time.Duration // Timeout for establishing a network connection . + AttemptTimeout time.Duration // Timeout for each individual attempt to probe a server's leadership. + BackoffFactor time.Duration // Exponential backoff factor for retries. + BackoffCap time.Duration // Maximum connection retry backoff value, + RetryLimit uint // Maximum number of retries, or 0 for unlimited. } diff --git a/vendor/github.com/canonical/go-dqlite/internal/protocol/connector.go b/vendor/github.com/canonical/go-dqlite/internal/protocol/connector.go index d77d09e2a3..c3596aed1d 100644 --- a/vendor/github.com/canonical/go-dqlite/internal/protocol/connector.go +++ b/vendor/github.com/canonical/go-dqlite/internal/protocol/connector.go @@ -6,8 +6,11 @@ import ( "fmt" "io" "net" + "time" "github.com/Rican7/retry" + "github.com/Rican7/retry/backoff" + "github.com/Rican7/retry/strategy" "github.com/canonical/go-dqlite/internal/logging" "github.com/pkg/errors" ) @@ -27,6 +30,22 @@ type Connector struct { // NewConnector returns a new connector that can be used by a dqlite driver to // create new clients connected to a leader dqlite server. func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) *Connector { + if config.DialTimeout == 0 { + config.DialTimeout = 10 * time.Second + } + + if config.AttemptTimeout == 0 { + config.AttemptTimeout = 60 * time.Second + } + + if config.BackoffFactor == 0 { + config.BackoffFactor = 100 * time.Millisecond + } + + if config.BackoffCap == 0 { + config.BackoffCap = time.Second + } + connector := &Connector{ id: id, store: store, @@ -43,6 +62,8 @@ func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) * func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { var protocol *Protocol + strategies := makeRetryStrategies(c.config.BackoffFactor, c.config.BackoffCap, c.config.RetryLimit) + // The retry strategy should be configured to retry indefinitely, until // the given context is done. err := retry.Retry(func(attempt uint) error { @@ -61,17 +82,16 @@ func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { var err error protocol, err = c.connectAttemptAll(ctx, log) if err != nil { - log(logging.Debug, "connection failed err=%v", err) return err } return nil - }, c.config.RetryStrategies...) + }, strategies...) if err != nil { - // The retry strategy should never give up until success or - // context expiration. - panic("connect retry aborted unexpectedly") + // We exhausted the number of retries allowed by the configured + // strategy. + return nil, ErrNoAvailableLeader } if ctx.Err() != nil { @@ -92,7 +112,7 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P // Make an attempt for each address until we find the leader. for _, server := range servers { log := func(l logging.Level, format string, a ...interface{}) { - format += fmt.Sprintf(" address=%s id=%d", server.Address, server.ID) + format += fmt.Sprintf(" address=%s", server.Address) log(l, format, a...) } @@ -107,68 +127,71 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P } if err != nil { // This server is unavailable, try with the next target. - log(logging.Debug, "server connection failed err=%v", err) + log(logging.Warn, "server unavailable err=%v", err) continue } if protocol != nil { // We found the leader - log(logging.Info, "connected") + log(logging.Debug, "connected") return protocol, nil } if leader == "" { // This server does not know who the current leader is, // try with the next target. + log(logging.Warn, "no known leader") continue } // If we get here, it means this server reported that another // server is the leader, let's close the connection to this // server and try with the suggested one. - //logger = logger.With(zap.String("leader", leader)) + log(logging.Debug, "connect to reported leader %s", leader) + + ctx, cancel = context.WithTimeout(ctx, c.config.AttemptTimeout) + defer cancel() + protocol, leader, err = c.connectAttemptOne(ctx, leader, version) if err != nil { // The leader reported by the previous server is // unavailable, try with the next target. - //logger.Info("leader server connection failed", zap.String("err", err.Error())) + log(logging.Warn, "reported leader unavailable err=%v", err) continue } if protocol == nil { // The leader reported by the target server does not consider itself // the leader, try with the next target. - //logger.Info("reported leader server is not the leader") + log(logging.Warn, "reported leader server is not the leader") continue } - log(logging.Info, "connected") + log(logging.Debug, "connected") return protocol, nil } return nil, ErrNoAvailableLeader } -// Connect establishes a connection with a dqlite node. -func Connect(ctx context.Context, dial DialFunc, address string, version uint64) (*Protocol, error) { - // Establish the connection. - conn, err := dial(ctx, address) - if err != nil { - return nil, errors.Wrap(err, "failed to establish network connection") - } - +// Perform the initial handshake using the given protocol version. +func Handshake(ctx context.Context, conn net.Conn, version uint64) (*Protocol, error) { // Latest protocol version. protocol := make([]byte, 8) binary.LittleEndian.PutUint64(protocol, version) + // Honor the ctx deadline, if present. + if deadline, ok := ctx.Deadline(); ok { + conn.SetDeadline(deadline) + defer conn.SetDeadline(time.Time{}) + } + // Perform the protocol handshake. n, err := conn.Write(protocol) if err != nil { - conn.Close() return nil, errors.Wrap(err, "failed to send handshake") } if n != 8 { - conn.Close() return nil, errors.Wrap(io.ErrShortWrite, "failed to send handshake") } - return NewProtocol(version, conn), nil + return newProtocol(version, conn), nil } // Connect to the given dqlite server and check if it's the leader. @@ -181,8 +204,18 @@ func Connect(ctx context.Context, dial DialFunc, address string, version uint64) // - Target is the leader: -> server, "", nil // func (c *Connector) connectAttemptOne(ctx context.Context, address string, version uint64) (*Protocol, string, error) { - protocol, err := Connect(ctx, c.config.Dial, address, version) + dialCtx, cancel := context.WithTimeout(ctx, c.config.DialTimeout) + defer cancel() + + // Establish the connection. + conn, err := c.config.Dial(dialCtx, address) if err != nil { + return nil, "", errors.Wrap(err, "failed to establish network connection") + } + + protocol, err := Handshake(ctx, conn, version) + if err != nil { + conn.Close() return nil, "", err } @@ -219,8 +252,8 @@ func (c *Connector) connectAttemptOne(ctx context.Context, address string, versi return nil, "", nil case address: // This server is the leader, register ourselves and return. - request.Reset() - response.Reset() + request.reset() + response.reset() EncodeClient(&request, c.id) @@ -247,4 +280,33 @@ func (c *Connector) connectAttemptOne(ctx context.Context, address string, versi } } +// Return a retry strategy with exponential backoff, capped at the given amount +// of time and possibly with a maximum number of retries. +func makeRetryStrategies(factor, cap time.Duration, limit uint) []strategy.Strategy { + backoff := backoff.BinaryExponential(factor) + + strategies := []strategy.Strategy{} + + if limit > 0 { + strategies = append(strategies, strategy.Limit(limit)) + } + + strategies = append(strategies, + func(attempt uint) bool { + if attempt > 0 { + duration := backoff(attempt) + // Duration might be negative in case of integer overflow. + if duration > cap || duration <= 0 { + duration = cap + } + time.Sleep(duration) + } + + return true + }, + ) + + return strategies +} + var errBadProtocol = fmt.Errorf("bad protocol") diff --git a/vendor/github.com/canonical/go-dqlite/internal/protocol/dial.go b/vendor/github.com/canonical/go-dqlite/internal/protocol/dial.go index 1f1e9b89aa..2cbe1f0a4c 100644 --- a/vendor/github.com/canonical/go-dqlite/internal/protocol/dial.go +++ b/vendor/github.com/canonical/go-dqlite/internal/protocol/dial.go @@ -2,6 +2,7 @@ package protocol import ( "context" + "crypto/tls" "net" ) @@ -18,3 +19,15 @@ func UnixDial(ctx context.Context, address string) (net.Conn, error) { dialer := net.Dialer{} return dialer.DialContext(ctx, "unix", address) } + +// TLSCipherSuites are the cipher suites by the go-dqlite TLS helpers. +var TLSCipherSuites = []uint16{ + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, + tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, + tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, +} diff --git a/vendor/github.com/canonical/go-dqlite/internal/protocol/errors.go b/vendor/github.com/canonical/go-dqlite/internal/protocol/errors.go index 4e5875aede..79d1fedc60 100644 --- a/vendor/github.com/canonical/go-dqlite/internal/protocol/errors.go +++ b/vendor/github.com/canonical/go-dqlite/internal/protocol/errors.go @@ -27,3 +27,13 @@ func (e ErrRequest) Error() string { // ErrRowsPart is returned when the first batch of a multi-response result // batch is done. var ErrRowsPart = fmt.Errorf("not all rows were returned in this response") + +// Error holds information about a SQLite error. +type Error struct { + Code int + Message string +} + +func (e Error) Error() string { + return e.Message +} diff --git a/vendor/github.com/canonical/go-dqlite/internal/protocol/message.go b/vendor/github.com/canonical/go-dqlite/internal/protocol/message.go index 55b3a03e18..c726d871e8 100644 --- a/vendor/github.com/canonical/go-dqlite/internal/protocol/message.go +++ b/vendor/github.com/canonical/go-dqlite/internal/protocol/message.go @@ -26,24 +26,23 @@ type Message struct { flags uint8 extra uint16 header []byte // Statically allocated header buffer - body1 buffer // Statically allocated body data, using bytes - body2 buffer // Dynamically allocated body data + body buffer // Message body data. } -// Init initializes the message using the given size of the statically -// allocated buffer (i.e. a buffer which is re-used across requests or -// responses encoded or decoded using this message object). -func (m *Message) Init(staticSize int) { - if (staticSize % messageWordSize) != 0 { - panic("static size is not aligned to word boundary") +// Init initializes the message using the given initial size for the data +// buffer, which is re-used across requests or responses encoded or decoded +// using this message object. +func (m *Message) Init(initialBufferSize int) { + if (initialBufferSize % messageWordSize) != 0 { + panic("initial buffer size is not aligned to word boundary") } m.header = make([]byte, messageHeaderSize) - m.body1.Bytes = make([]byte, staticSize) - m.Reset() + m.body.Bytes = make([]byte, initialBufferSize) + m.reset() } // Reset the state of the message so it can be used to encode or decode again. -func (m *Message) Reset() { +func (m *Message) reset() { m.words = 0 m.mtype = 0 m.flags = 0 @@ -51,9 +50,7 @@ func (m *Message) Reset() { for i := 0; i < messageHeaderSize; i++ { m.header[i] = 0 } - m.body1.Offset = 0 - m.body2.Bytes = nil - m.body2.Offset = 0 + m.body.Offset = 0 } // Append a byte slice to the message. @@ -232,11 +229,11 @@ func (m *Message) putNamedValues(values NamedValues) { // Finalize the message by setting the message type and the number // of words in the body (calculated from the body size). func (m *Message) putHeader(mtype uint8) { - if m.body1.Offset <= 0 { + if m.body.Offset <= 0 { panic("static offset is not positive") } - if (m.body1.Offset % messageWordSize) != 0 { + if (m.body.Offset % messageWordSize) != 0 { panic("static body is not aligned") } @@ -244,22 +241,7 @@ func (m *Message) putHeader(mtype uint8) { m.flags = 0 m.extra = 0 - m.words = uint32(m.body1.Offset) / messageWordSize - - if m.body2.Bytes == nil { - m.finalize() - return - } - - if m.body2.Offset <= 0 { - panic("dynamic offset is not positive") - } - - if (m.body2.Offset % messageWordSize) != 0 { - panic("dynamic body is not aligned") - } - - m.words += uint32(m.body2.Offset) / messageWordSize + m.words = uint32(m.body.Offset) / messageWordSize m.finalize() } @@ -276,27 +258,14 @@ func (m *Message) finalize() { } func (m *Message) bufferForPut(size int) *buffer { - if m.body2.Bytes != nil { - if (m.body2.Offset + size) > len(m.body2.Bytes) { - // Grow body2. - // - // TODO: find a good grow strategy. - bytes := make([]byte, m.body2.Offset+size) - copy(bytes, m.body2.Bytes) - m.body2.Bytes = bytes - } - - return &m.body2 + for (m.body.Offset + size) > len(m.body.Bytes) { + // Grow message buffer. + bytes := make([]byte, len(m.body.Bytes)*2) + copy(bytes, m.body.Bytes) + m.body.Bytes = bytes } - if (m.body1.Offset + size) > len(m.body1.Bytes) { - m.body2.Bytes = make([]byte, size) - m.body2.Offset = 0 - - return &m.body2 - } - - return &m.body1 + return &m.body } // Return the message type and its flags. @@ -310,31 +279,6 @@ func (m *Message) getString() string { index := bytes.IndexByte(b.Bytes[b.Offset:], 0) if index == -1 { - // Check if the string overflows in the dynamic buffer. - if b == &m.body1 && m.body2.Bytes != nil { - // Assert that this is the first read of the dynamic buffer. - if m.body2.Offset != 0 { - panic("static buffer read after dynamic buffer one") - } - index = bytes.IndexByte(m.body2.Bytes[0:], 0) - if index != -1 { - // We found the trailing part of the string. - data := b.Bytes[b.Offset:] - data = append(data, m.body2.Bytes[0:index]...) - - index++ - - if trailing := index % messageWordSize; trailing != 0 { - // Account for padding, moving index to the next word boundary. - index += messageWordSize - trailing - } - - m.body1.Offset = len(m.body1.Bytes) - m.body2.Advance(index) - - return string(data) - } - } panic("no string found") } s := string(b.Bytes[b.Offset : b.Offset+index]) @@ -465,31 +409,23 @@ func (m *Message) getFiles() Files { func (m *Message) hasBeenConsumed() bool { size := int(m.words * messageWordSize) - return (m.body1.Offset == size || m.body1.Offset == len(m.body1.Bytes)) && - m.body1.Offset+m.body2.Offset == size + return m.body.Offset == size } func (m *Message) lastByte() byte { size := int(m.words * messageWordSize) - if size > len(m.body1.Bytes) { - size = size - m.body1.Offset - return m.body2.Bytes[size-1] - } - return m.body1.Bytes[size-1] + return m.body.Bytes[size-1] } func (m *Message) bufferForGet() *buffer { size := int(m.words * messageWordSize) - if m.body1.Offset == size || m.body1.Offset == len(m.body1.Bytes) { - // The static body has been exahusted, use the dynamic one. - if m.body1.Offset+m.body2.Offset == size { - err := fmt.Errorf("short message: type=%d words=%d off=%d", m.mtype, m.words, m.body1.Offset) - panic(err) - } - return &m.body2 + // The static body has been exahusted, use the dynamic one. + if m.body.Offset == size { + err := fmt.Errorf("short message: type=%d words=%d off=%d", m.mtype, m.words, m.body.Offset) + panic(err) } - return &m.body1 + return &m.body } // Result holds the result of a statement. @@ -639,7 +575,7 @@ func (r *Rows) Close() error { err = fmt.Errorf("unexpected end of message") } } - r.message.Reset() + r.message.reset() return err } @@ -664,7 +600,7 @@ func (f *Files) Next() (string, []byte) { } func (f *Files) Close() { - f.message.Reset() + f.message.reset() } const ( diff --git a/vendor/github.com/canonical/go-dqlite/internal/protocol/protocol.go b/vendor/github.com/canonical/go-dqlite/internal/protocol/protocol.go index 47774813f4..0a9c8b9e79 100644 --- a/vendor/github.com/canonical/go-dqlite/internal/protocol/protocol.go +++ b/vendor/github.com/canonical/go-dqlite/internal/protocol/protocol.go @@ -13,31 +13,23 @@ import ( // Protocol sends and receive the dqlite message on the wire. type Protocol struct { - version uint64 // Protocol version - conn net.Conn // Underlying network connection. - contextTimeout time.Duration // Default context timeout. - closeCh chan struct{} // Stops the heartbeat when the connection gets closed - mu sync.Mutex // Serialize requests - netErr error // A network error occurred + version uint64 // Protocol version + conn net.Conn // Underlying network connection. + closeCh chan struct{} // Stops the heartbeat when the connection gets closed + mu sync.Mutex // Serialize requests + netErr error // A network error occurred } -func NewProtocol(version uint64, conn net.Conn) *Protocol { +func newProtocol(version uint64, conn net.Conn) *Protocol { protocol := &Protocol{ - version: version, - conn: conn, - closeCh: make(chan struct{}), - contextTimeout: 5 * time.Second, + version: version, + conn: conn, + closeCh: make(chan struct{}), } return protocol } -// SetContextTimeout sets the default context timeout when no deadline is -// provided. -func (p *Protocol) SetContextTimeout(timeout time.Duration) { - p.contextTimeout = timeout -} - // Call invokes a dqlite RPC, sending a request message and receiving a // response message. func (p *Protocol) Call(ctx context.Context, request, response *Message) (err error) { @@ -50,21 +42,22 @@ func (p *Protocol) Call(ctx context.Context, request, response *Message) (err er return p.netErr } - // Honor the ctx deadline, if present, or use a default. - deadline, ok := ctx.Deadline() - if !ok { - deadline = time.Now().Add(p.contextTimeout) + var budget time.Duration + + // Honor the ctx deadline, if present. + if deadline, ok := ctx.Deadline(); ok { + p.conn.SetDeadline(deadline) + budget = time.Until(deadline) + defer p.conn.SetDeadline(time.Time{}) } - p.conn.SetDeadline(deadline) - if err = p.send(request); err != nil { - err = errors.Wrap(err, "failed to send request") + err = errors.Wrapf(err, "send request (budget=%s)", budget) goto err } if err = p.recv(response); err != nil { - err = errors.Wrap(err, "failed to receive response") + err = errors.Wrapf(err, "receive response (budget=%s)", budget) goto err } @@ -91,14 +84,11 @@ func (p *Protocol) Interrupt(ctx context.Context, request *Message, response *Me p.mu.Lock() defer p.mu.Unlock() - // Honor the ctx deadline, if present, or use a default. - deadline, ok := ctx.Deadline() - if !ok { - deadline = time.Now().Add(2 * time.Second) + // Honor the ctx deadline, if present. + if deadline, ok := ctx.Deadline(); ok { + p.conn.SetDeadline(deadline) + defer p.conn.SetDeadline(time.Time{}) } - p.conn.SetDeadline(deadline) - - defer request.Reset() EncodeInterrupt(request, 0) @@ -108,12 +98,10 @@ func (p *Protocol) Interrupt(ctx context.Context, request *Message, response *Me for { if err := p.recv(response); err != nil { - response.Reset() return errors.Wrap(err, "failed to receive response") } mtype, _ := response.getHeader() - response.Reset() if mtype == ResponseEmpty { break @@ -155,7 +143,7 @@ func (p *Protocol) sendHeader(req *Message) error { } func (p *Protocol) sendBody(req *Message) error { - buf := req.body1.Bytes[:req.body1.Offset] + buf := req.body.Bytes[:req.body.Offset] n, err := p.conn.Write(buf) if err != nil { return errors.Wrap(err, "failed to send static body") @@ -165,24 +153,12 @@ func (p *Protocol) sendBody(req *Message) error { return errors.Wrap(io.ErrShortWrite, "failed to write body") } - if req.body2.Bytes == nil { - return nil - } - - buf = req.body2.Bytes[:req.body2.Offset] - n, err = p.conn.Write(buf) - if err != nil { - return errors.Wrap(err, "failed to send dynamic body") - } - - if n != len(buf) { - return errors.Wrap(io.ErrShortWrite, "failed to write body") - } - return nil } func (p *Protocol) recv(res *Message) error { + res.reset() + if err := p.recvHeader(res); err != nil { return errors.Wrap(err, "failed to receive header") } @@ -209,30 +185,19 @@ func (p *Protocol) recvHeader(res *Message) error { func (p *Protocol) recvBody(res *Message) error { n := int(res.words) * messageWordSize - n1 := n - n2 := 0 - if n1 > len(res.body1.Bytes) { - // We need to allocate the dynamic buffer. - n1 = len(res.body1.Bytes) - n2 = n - n1 + for n > len(res.body.Bytes) { + // Grow message buffer. + bytes := make([]byte, len(res.body.Bytes)*2) + res.body.Bytes = bytes } - buf := res.body1.Bytes[:n1] + buf := res.body.Bytes[:n] if err := p.recvPeek(buf); err != nil { return errors.Wrap(err, "failed to read body") } - if n2 > 0 { - res.body2.Bytes = make([]byte, n2) - res.body2.Offset = 0 - buf = res.body2.Bytes - if err := p.recvPeek(buf); err != nil { - return errors.Wrap(err, "failed to read body") - } - } - return nil } diff --git a/vendor/github.com/canonical/go-dqlite/internal/protocol/request.go b/vendor/github.com/canonical/go-dqlite/internal/protocol/request.go index 1d4797eb23..3927e27a30 100644 --- a/vendor/github.com/canonical/go-dqlite/internal/protocol/request.go +++ b/vendor/github.com/canonical/go-dqlite/internal/protocol/request.go @@ -7,6 +7,7 @@ package protocol // EncodeLeader encodes a Leader request. func EncodeLeader(request *Message) { + request.reset() request.putUint64(0) request.putHeader(RequestLeader) @@ -14,6 +15,7 @@ func EncodeLeader(request *Message) { // EncodeClient encodes a Client request. func EncodeClient(request *Message, id uint64) { + request.reset() request.putUint64(id) request.putHeader(RequestClient) @@ -21,6 +23,7 @@ func EncodeClient(request *Message, id uint64) { // EncodeHeartbeat encodes a Heartbeat request. func EncodeHeartbeat(request *Message, timestamp uint64) { + request.reset() request.putUint64(timestamp) request.putHeader(RequestHeartbeat) @@ -28,6 +31,7 @@ func EncodeHeartbeat(request *Message, timestamp uint64) { // EncodeOpen encodes a Open request. func EncodeOpen(request *Message, name string, flags uint64, vfs string) { + request.reset() request.putString(name) request.putUint64(flags) request.putString(vfs) @@ -37,6 +41,7 @@ func EncodeOpen(request *Message, name string, flags uint64, vfs string) { // EncodePrepare encodes a Prepare request. func EncodePrepare(request *Message, db uint64, sql string) { + request.reset() request.putUint64(db) request.putString(sql) @@ -45,6 +50,7 @@ func EncodePrepare(request *Message, db uint64, sql string) { // EncodeExec encodes a Exec request. func EncodeExec(request *Message, db uint32, stmt uint32, values NamedValues) { + request.reset() request.putUint32(db) request.putUint32(stmt) request.putNamedValues(values) @@ -54,6 +60,7 @@ func EncodeExec(request *Message, db uint32, stmt uint32, values NamedValues) { // EncodeQuery encodes a Query request. func EncodeQuery(request *Message, db uint32, stmt uint32, values NamedValues) { + request.reset() request.putUint32(db) request.putUint32(stmt) request.putNamedValues(values) @@ -63,6 +70,7 @@ func EncodeQuery(request *Message, db uint32, stmt uint32, values NamedValues) { // EncodeFinalize encodes a Finalize request. func EncodeFinalize(request *Message, db uint32, stmt uint32) { + request.reset() request.putUint32(db) request.putUint32(stmt) @@ -71,6 +79,7 @@ func EncodeFinalize(request *Message, db uint32, stmt uint32) { // EncodeExecSQL encodes a ExecSQL request. func EncodeExecSQL(request *Message, db uint64, sql string, values NamedValues) { + request.reset() request.putUint64(db) request.putString(sql) request.putNamedValues(values) @@ -80,6 +89,7 @@ func EncodeExecSQL(request *Message, db uint64, sql string, values NamedValues) // EncodeQuerySQL encodes a QuerySQL request. func EncodeQuerySQL(request *Message, db uint64, sql string, values NamedValues) { + request.reset() request.putUint64(db) request.putString(sql) request.putNamedValues(values) @@ -89,6 +99,7 @@ func EncodeQuerySQL(request *Message, db uint64, sql string, values NamedValues) // EncodeInterrupt encodes a Interrupt request. func EncodeInterrupt(request *Message, db uint64) { + request.reset() request.putUint64(db) request.putHeader(RequestInterrupt) @@ -96,6 +107,7 @@ func EncodeInterrupt(request *Message, db uint64) { // EncodeAdd encodes a Add request. func EncodeAdd(request *Message, id uint64, address string) { + request.reset() request.putUint64(id) request.putString(address) @@ -104,6 +116,7 @@ func EncodeAdd(request *Message, id uint64, address string) { // EncodeAssign encodes a Assign request. func EncodeAssign(request *Message, id uint64, role uint64) { + request.reset() request.putUint64(id) request.putUint64(role) @@ -112,6 +125,7 @@ func EncodeAssign(request *Message, id uint64, role uint64) { // EncodeRemove encodes a Remove request. func EncodeRemove(request *Message, id uint64) { + request.reset() request.putUint64(id) request.putHeader(RequestRemove) @@ -119,6 +133,7 @@ func EncodeRemove(request *Message, id uint64) { // EncodeDump encodes a Dump request. func EncodeDump(request *Message, name string) { + request.reset() request.putString(name) request.putHeader(RequestDump) @@ -126,6 +141,7 @@ func EncodeDump(request *Message, name string) { // EncodeCluster encodes a Cluster request. func EncodeCluster(request *Message, format uint64) { + request.reset() request.putUint64(format) request.putHeader(RequestCluster) @@ -133,6 +149,7 @@ func EncodeCluster(request *Message, format uint64) { // EncodeTransfer encodes a Transfer request. func EncodeTransfer(request *Message, id uint64) { + request.reset() request.putUint64(id) request.putHeader(RequestTransfer) diff --git a/vendor/github.com/canonical/go-dqlite/internal/protocol/schema.sh b/vendor/github.com/canonical/go-dqlite/internal/protocol/schema.sh index 1e7b53f4c6..94299b0fa3 100644 --- a/vendor/github.com/canonical/go-dqlite/internal/protocol/schema.sh +++ b/vendor/github.com/canonical/go-dqlite/internal/protocol/schema.sh @@ -54,6 +54,7 @@ if [ "$entity" = "--request" ]; then // Encode${cmd} encodes a $cmd request. func Encode${cmd}(request *Message${args}) { + request.reset() EOF for i in "${@}" diff --git a/vendor/github.com/canonical/go-dqlite/node.go b/vendor/github.com/canonical/go-dqlite/node.go index c77061744e..4f41346009 100644 --- a/vendor/github.com/canonical/go-dqlite/node.go +++ b/vendor/github.com/canonical/go-dqlite/node.go @@ -118,6 +118,16 @@ func (s *Node) Close() error { return nil } +// BootstrapID is a magic ID that should be used for the fist node in a +// cluster. Alternatively ID 1 can be used as well. +const BootstrapID = 0x2dc171858c3155be + +// GenerateID generates a unique ID for a new node, based on a hash of its +// address and the current time. +func GenerateID(address string) uint64 { + return bindings.GenerateID(address) +} + // Create a options object with sane defaults. func defaultOptions() *options { return &options{ diff --git a/vendor/modules.txt b/vendor/modules.txt index c3567eadea..0b40a6487a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -135,7 +135,7 @@ github.com/blang/semver github.com/bronze1man/goStrongswanVici # github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 github.com/buger/jsonparser -# github.com/canonical/go-dqlite v1.3.0 +# github.com/canonical/go-dqlite v1.5.1 github.com/canonical/go-dqlite github.com/canonical/go-dqlite/client github.com/canonical/go-dqlite/driver @@ -730,7 +730,7 @@ github.com/rancher/helm-controller/pkg/generated/informers/externalversions/helm github.com/rancher/helm-controller/pkg/generated/informers/externalversions/internalinterfaces github.com/rancher/helm-controller/pkg/generated/listers/helm.cattle.io/v1 github.com/rancher/helm-controller/pkg/helm -# github.com/rancher/kine v0.3.6-0.20200422224205-0a0f5b924129 +# github.com/rancher/kine v0.4.0 github.com/rancher/kine/pkg/broadcaster github.com/rancher/kine/pkg/client github.com/rancher/kine/pkg/drivers/dqlite