update dqlite-build and kine

pull/1672/head
galal-hussein 2020-04-23 22:34:44 +02:00
parent 37e91dc3a1
commit dae4a92091
24 changed files with 502 additions and 370 deletions

View File

@ -29,7 +29,7 @@ ENV SELINUX $SELINUX
ARG DQLITE=true ARG DQLITE=true
ENV DQLITE $DQLITE ENV DQLITE $DQLITE
COPY --from=rancher/dqlite-build:v1.3.1-r1 /dist/artifacts /usr/src/ COPY --from=rancher/dqlite-build:v1.4.1-r1 /dist/artifacts /usr/src/
RUN if [ "$DQLITE" = true ]; then \ RUN if [ "$DQLITE" = true ]; then \
tar xzf /usr/src/dqlite.tgz -C / && \ tar xzf /usr/src/dqlite.tgz -C / && \
apk add --allow-untrusted /usr/local/packages/*.apk \ apk add --allow-untrusted /usr/local/packages/*.apk \

4
go.mod
View File

@ -67,7 +67,7 @@ require (
github.com/bhendo/go-powershell v0.0.0-20190719160123-219e7fb4e41e // indirect github.com/bhendo/go-powershell v0.0.0-20190719160123-219e7fb4e41e // indirect
github.com/bronze1man/goStrongswanVici v0.0.0-20190828090544-27d02f80ba40 // indirect github.com/bronze1man/goStrongswanVici v0.0.0-20190828090544-27d02f80ba40 // indirect
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect
github.com/canonical/go-dqlite v1.3.0 github.com/canonical/go-dqlite v1.5.1
github.com/containerd/cgroups v0.0.0-00010101000000-000000000000 // indirect github.com/containerd/cgroups v0.0.0-00010101000000-000000000000 // indirect
github.com/containerd/containerd v1.3.0-beta.2.0.20190828155532-0293cbd26c69 github.com/containerd/containerd v1.3.0-beta.2.0.20190828155532-0293cbd26c69
github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 // indirect github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 // indirect
@ -102,7 +102,7 @@ require (
github.com/rakelkar/gonetsh v0.0.0-20190719023240-501daadcadf8 // indirect github.com/rakelkar/gonetsh v0.0.0-20190719023240-501daadcadf8 // indirect
github.com/rancher/dynamiclistener v0.2.0 github.com/rancher/dynamiclistener v0.2.0
github.com/rancher/helm-controller v0.5.0 github.com/rancher/helm-controller v0.5.0
github.com/rancher/kine v0.3.6-0.20200422224205-0a0f5b924129 github.com/rancher/kine v0.4.0
github.com/rancher/remotedialer v0.2.0 github.com/rancher/remotedialer v0.2.0
github.com/rancher/wrangler v0.6.1 github.com/rancher/wrangler v0.6.1
github.com/rancher/wrangler-api v0.6.0 github.com/rancher/wrangler-api v0.6.0

4
go.sum
View File

@ -99,6 +99,8 @@ github.com/caddyserver/caddy v1.0.3/go.mod h1:G+ouvOY32gENkJC+jhgl62TyhvqEsFaDiZ
github.com/canonical/go-dqlite v1.2.0/go.mod h1:wp00vfMvPYgNCyxcPdHB5XExmDoCGoPUGymloAQT17Y= github.com/canonical/go-dqlite v1.2.0/go.mod h1:wp00vfMvPYgNCyxcPdHB5XExmDoCGoPUGymloAQT17Y=
github.com/canonical/go-dqlite v1.3.0 h1:c+7eGZfh0K7yCmGrBkNRGZdY8R8+2jSSkz6Zr3YCjJE= github.com/canonical/go-dqlite v1.3.0 h1:c+7eGZfh0K7yCmGrBkNRGZdY8R8+2jSSkz6Zr3YCjJE=
github.com/canonical/go-dqlite v1.3.0/go.mod h1:wp00vfMvPYgNCyxcPdHB5XExmDoCGoPUGymloAQT17Y= github.com/canonical/go-dqlite v1.3.0/go.mod h1:wp00vfMvPYgNCyxcPdHB5XExmDoCGoPUGymloAQT17Y=
github.com/canonical/go-dqlite v1.5.1 h1:1YjtIrFsC1A3XlgsX38ARAiKhvkZS63PqsEd8z3T4yU=
github.com/canonical/go-dqlite v1.5.1/go.mod h1:wp00vfMvPYgNCyxcPdHB5XExmDoCGoPUGymloAQT17Y=
github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/prettybench v0.0.0-20150116022406-03b8cfe5406c/go.mod h1:Xe6ZsFhtM8HrDku0pxJ3/Lr51rwykrzgFwpmTzleatY= github.com/cespare/prettybench v0.0.0-20150116022406-03b8cfe5406c/go.mod h1:Xe6ZsFhtM8HrDku0pxJ3/Lr51rwykrzgFwpmTzleatY=
@ -650,6 +652,8 @@ github.com/rancher/kine v0.3.5 h1:Tm4eOtejpnzs1WFBrXj76lCLvX9czLlTkgqUk9luCQk=
github.com/rancher/kine v0.3.5/go.mod h1:xEMl0tLCva9/9me7mXJ3m9Vo6yqHgC4OU3NiK4CPrGQ= github.com/rancher/kine v0.3.5/go.mod h1:xEMl0tLCva9/9me7mXJ3m9Vo6yqHgC4OU3NiK4CPrGQ=
github.com/rancher/kine v0.3.6-0.20200422224205-0a0f5b924129 h1:4HYvCG8+pfkBlpYLv/5ZOdxagg3jTszMOqMjamMQ0hA= github.com/rancher/kine v0.3.6-0.20200422224205-0a0f5b924129 h1:4HYvCG8+pfkBlpYLv/5ZOdxagg3jTszMOqMjamMQ0hA=
github.com/rancher/kine v0.3.6-0.20200422224205-0a0f5b924129/go.mod h1:xEMl0tLCva9/9me7mXJ3m9Vo6yqHgC4OU3NiK4CPrGQ= github.com/rancher/kine v0.3.6-0.20200422224205-0a0f5b924129/go.mod h1:xEMl0tLCva9/9me7mXJ3m9Vo6yqHgC4OU3NiK4CPrGQ=
github.com/rancher/kine v0.4.0 h1:1IhWy3TzjExG8xnj46eyUEWdzqNAD1WrgL4eEBKm6Uc=
github.com/rancher/kine v0.4.0/go.mod h1:IImtCJ68AIkE+VY/kUI0NkyJL5q5WzO8QvMsSXqbrpA=
github.com/rancher/kubernetes v1.18.2-k3s.1 h1:LhWNObWF7dL/+T57LkYpuRKtsCBpt0P5G6dRVFG+Ncs= github.com/rancher/kubernetes v1.18.2-k3s.1 h1:LhWNObWF7dL/+T57LkYpuRKtsCBpt0P5G6dRVFG+Ncs=
github.com/rancher/kubernetes v1.18.2-k3s.1/go.mod h1:z8xjOOO1Ljz+TaHpOxVGC7cxtF32TesIamoQ+BZrVS0= github.com/rancher/kubernetes v1.18.2-k3s.1/go.mod h1:z8xjOOO1Ljz+TaHpOxVGC7cxtF32TesIamoQ+BZrVS0=
github.com/rancher/kubernetes/staging/src/k8s.io/api v1.18.2-k3s.1 h1:tYDY9g8+xLwUcsG9T6Xg7cBkO/vgU6yv7cQKqUN6NDE= github.com/rancher/kubernetes/staging/src/k8s.io/api v1.18.2-k3s.1 h1:tYDY9g8+xLwUcsG9T6Xg7cBkO/vgU6yv7cQKqUN6NDE=

View File

@ -1,4 +1,6 @@
.sqlite .sqlite
cmd/dqlite/dqlite
cmd/dqlite-demo/dqlite-demo
demo demo
profile.coverprofile profile.coverprofile
overalls.coverprofile overalls.coverprofile

View File

@ -10,9 +10,32 @@ Usage
The best way to understand how to use the ```go-dqlite``` package is probably by The best way to understand how to use the ```go-dqlite``` package is probably by
looking at the source code of the [demo looking at the source code of the [demo
program](https://github.com/canonical/go-dqlite/tree/master/cmd/dqlite-demo) and program](https://github.com/canonical/go-dqlite/tree/master/cmd/dqlite-demo/main.go) and
use it as example. use it as example.
In general your application will use code such as:
```go
dir := "/path/to/data/directory"
address := "1.2.3.4:666" // Unique node address
cluster := []string{...} // Optional list of existing nodes, when starting a new node
app, err := app.New(dir, app.WithAddress(address), app.WithCluster(cluster))
if err != nil {
// ...
}
db, err := app.Open(context.Background(), "my-database")
if err != nil {
// ...
}
// db is a *sql.DB object
if _, err := db.Exec("CREATE TABLE my_table (n INT)"); err != nil
// ...
}
```
Build Build
----- -----
@ -50,42 +73,61 @@ go install -tags libsqlite3 ./cmd/dqlite-demo
from the top-level directory of this repository. from the top-level directory of this repository.
Once the ```dqlite-demo``` binary is installed, start three nodes of the demo This builds a demo dqlite application, which exposes a simple key/value store
application, respectively with IDs ```1```, ```2,``` and ```3```: over an HTTP API.
Once the `dqlite-demo` binary is installed (normally under `~/go/bin`),
start three nodes of the demo application:
```bash ```bash
dqlite-demo start 1 & dqlite-demo --api 127.0.0.1:8001 --db 127.0.0.1:9001 &
dqlite-demo start 2 & dqlite-demo --api 127.0.0.1:8002 --db 127.0.0.1:9002 --join 127.0.0.1:9001 &
dqlite-demo start 3 & dqlite-demo --api 127.0.0.1:8003 --db 127.0.0.1:9003 --join 127.0.0.1:9001 &
``` ```
The node with ID ```1``` automatically becomes the leader of a single node The `--api` flag tells the demo program where to expose its HTTP API.
cluster, while the nodes with IDs ```2``` and ```3``` are waiting to be notified
what cluster they belong to. Let's make nodes ```2``` and ```3``` join the The `--db` flag tells the demo program to use the given address for internal
cluster: database replication.
The `--join` flag is optional and should be used only for additional nodes after
the first one. It informs them about the existing cluster, so they can
automatically join it.
Now we can start using the cluster. Let's insert a key pair:
```bash ```bash
dqlite-demo add 2 curl -X PUT -d my-key http://127.0.0.1:8001/my-value
dqlite-demo add 3
```
Now we can start using the cluster. The demo application is just a simple
key/value store that stores data in a SQLite table. Let's insert a key pair:
```bash
dqlite-demo update my-key my-value
``` ```
and then retrive it from the database: and then retrive it from the database:
```bash ```bash
dqlite-demo query my-key curl http://127.0.0.1:8001/my-value
``` ```
Currently node ```1``` is the leader. If we stop it and then try to query the Currently the first node is the leader. If we stop it and then try to query the
key again we'll notice that the ```query``` command hangs for a bit waiting for key again curl will fail, but we can simply change the endpoint to another node
the failover to occur and for another node to step up as leader: and things will work since an automatic failover has taken place:
```bash
kill -TERM %1; curl http://127.0.0.1:8002/my-value
```
Shell
------
A basic SQLite-like dqlite shell can be built with:
``` ```
kill -TERM %1; sleep 0.1; dqlite-demo query my-key; dqlite-demo cluster go install -tags libsqlite3 ./cmd/dqlite
``` ```
You can test it with the `dqlite-demo` with:
```
dqlite -s 127.0.0.1:9001
```
It supports normal SQL queries plus the special `.cluster` and `.leader`
commands to inspect the cluster members and the current leader.

View File

@ -2,10 +2,6 @@ package client
import ( import (
"context" "context"
"encoding/binary"
"io"
"net"
"strings"
"github.com/canonical/go-dqlite/internal/protocol" "github.com/canonical/go-dqlite/internal/protocol"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -57,22 +53,13 @@ func New(ctx context.Context, address string, options ...Option) (*Client, error
return nil, errors.Wrap(err, "failed to establish network connection") return nil, errors.Wrap(err, "failed to establish network connection")
} }
// Latest protocol version. protocol, err := protocol.Handshake(ctx, conn, protocol.VersionOne)
proto := make([]byte, 8)
binary.LittleEndian.PutUint64(proto, protocol.VersionOne)
// Perform the protocol handshake.
n, err := conn.Write(proto)
if err != nil { if err != nil {
conn.Close() conn.Close()
return nil, errors.Wrap(err, "failed to send handshake") return nil, err
}
if n != 8 {
conn.Close()
return nil, errors.Wrap(io.ErrShortWrite, "failed to send handshake")
} }
client := &Client{protocol: protocol.NewProtocol(protocol.VersionOne, conn)} client := &Client{protocol: protocol}
return client, nil return client, nil
} }
@ -274,10 +261,3 @@ func defaultOptions() *options {
LogFunc: DefaultLogFunc, LogFunc: DefaultLogFunc,
} }
} }
func DefaultDialFunc(ctx context.Context, address string) (net.Conn, error) {
if strings.HasPrefix(address, "@") {
return protocol.UnixDial(ctx, address)
}
return protocol.TCPDial(ctx, address)
}

33
vendor/github.com/canonical/go-dqlite/client/dial.go generated vendored Normal file
View File

@ -0,0 +1,33 @@
package client
import (
"context"
"crypto/tls"
"net"
"strings"
"github.com/canonical/go-dqlite/internal/protocol"
)
// DefaultDialFunc is the default dial function, which can handle plain TCP and
// Unix socket endpoints. You can customize it with WithDialFunc()
func DefaultDialFunc(ctx context.Context, address string) (net.Conn, error) {
if strings.HasPrefix(address, "@") {
return protocol.UnixDial(ctx, address)
}
return protocol.TCPDial(ctx, address)
}
// DialFuncWithTLS returns a dial function that uses TLS encryption.
//
// The given dial function will be used to establish the network connection,
// and the given TLS config will be used for encryption.
func DialFuncWithTLS(dial DialFunc, config *tls.Config) DialFunc {
return func(ctx context.Context, addr string) (net.Conn, error) {
conn, err := dial(ctx, addr)
if err != nil {
return nil, err
}
return tls.Client(conn, config), nil
}
}

View File

@ -2,10 +2,7 @@ package client
import ( import (
"context" "context"
"time"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/canonical/go-dqlite/internal/protocol" "github.com/canonical/go-dqlite/internal/protocol"
) )
@ -19,9 +16,6 @@ func FindLeader(ctx context.Context, store NodeStore, options ...Option) (*Clien
config := protocol.Config{ config := protocol.Config{
Dial: o.DialFunc, Dial: o.DialFunc,
AttemptTimeout: time.Second,
RetryStrategies: []strategy.Strategy{
strategy.Backoff(backoff.BinaryExponential(time.Millisecond))},
} }
connector := protocol.NewConnector(0, store, config, o.LogFunc) connector := protocol.NewConnector(0, store, config, o.LogFunc)
protocol, err := connector.Connect(ctx) protocol, err := connector.Connect(ctx)

View File

@ -1,10 +1,6 @@
package client package client
import ( import (
"fmt"
"log"
"os"
"github.com/canonical/go-dqlite/internal/logging" "github.com/canonical/go-dqlite/internal/logging"
) )
@ -22,11 +18,5 @@ const (
LogError = logging.Error LogError = logging.Error
) )
var ( // DefaultLogFunc doesn't emit any message.
logger = log.New(os.Stdout, "", log.LstdFlags|log.Lmicroseconds) func DefaultLogFunc(l LogLevel, format string, a ...interface{}) {}
)
// DefaultLogFunc emits messages using the stdlib's logger.
func DefaultLogFunc(l LogLevel, format string, a ...interface{}) {
logger.Output(2, fmt.Sprintf("[%s]: %s", l.String(), fmt.Sprintf(format, a...)))
}

View File

@ -4,7 +4,11 @@ import (
"context" "context"
"database/sql" "database/sql"
"fmt" "fmt"
"io/ioutil"
"os"
"sync"
"github.com/ghodss/yaml"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/canonical/go-dqlite/internal/protocol" "github.com/canonical/go-dqlite/internal/protocol"
@ -163,3 +167,65 @@ func (d *DatabaseNodeStore) Set(ctx context.Context, servers []NodeInfo) error {
return nil return nil
} }
// Persists a list addresses of dqlite nodes in a YAML file.
type YamlNodeStore struct {
path string
servers []NodeInfo
mu sync.RWMutex
}
// NewYamlNodeStore creates a new YamlNodeStore backed by the given YAML file.
func NewYamlNodeStore(path string) (*YamlNodeStore, error) {
servers := []NodeInfo{}
_, err := os.Stat(path)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
} else {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
if err := yaml.Unmarshal(data, &servers); err != nil {
return nil, err
}
}
store := &YamlNodeStore{
path: path,
servers: servers,
}
return store, nil
}
// Get the current servers.
func (s *YamlNodeStore) Get(ctx context.Context) ([]NodeInfo, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.servers, nil
}
// Set the servers addresses.
func (s *YamlNodeStore) Set(ctx context.Context, servers []NodeInfo) error {
s.mu.Lock()
defer s.mu.Unlock()
data, err := yaml.Marshal(servers)
if err != nil {
return err
}
if err := ioutil.WriteFile(s.path, data, 0600); err != nil {
return err
}
s.servers = servers
return nil
}

View File

@ -5,6 +5,7 @@ import (
"os" "os"
"github.com/canonical/go-dqlite/internal/bindings" "github.com/canonical/go-dqlite/internal/bindings"
"github.com/canonical/go-dqlite/internal/protocol"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -26,7 +27,7 @@ import (
// from setting Single-thread mode at all. // from setting Single-thread mode at all.
func ConfigMultiThread() error { func ConfigMultiThread() error {
if err := bindings.ConfigMultiThread(); err != nil { if err := bindings.ConfigMultiThread(); err != nil {
if err, ok := err.(bindings.Error); ok && err.Code == 21 /* SQLITE_MISUSE */ { if err, ok := err.(protocol.Error); ok && err.Code == 21 /* SQLITE_MISUSE */ {
return fmt.Errorf("SQLite is already initialized") return fmt.Errorf("SQLite is already initialized")
} }
return errors.Wrap(err, "unknown error") return errors.Wrap(err, "unknown error")

View File

@ -24,12 +24,9 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/canonical/go-dqlite/client" "github.com/canonical/go-dqlite/client"
"github.com/canonical/go-dqlite/internal/bindings"
"github.com/canonical/go-dqlite/internal/protocol" "github.com/canonical/go-dqlite/internal/protocol"
) )
@ -44,7 +41,7 @@ type Driver struct {
} }
// Error is returned in case of database errors. // Error is returned in case of database errors.
type Error = bindings.Error type Error = protocol.Error
// Error codes. Values here mostly overlap with native SQLite codes. // Error codes. Values here mostly overlap with native SQLite codes.
const ( const (
@ -87,6 +84,9 @@ func WithDialFunc(dial DialFunc) Option {
// WithConnectionTimeout sets the connection timeout. // WithConnectionTimeout sets the connection timeout.
// //
// If not used, the default is 5 seconds. // If not used, the default is 5 seconds.
//
// DEPRECATED: Connection cancellation is supported via the driver.Connector
// interface, which is used internally by the stdlib sql package.
func WithConnectionTimeout(timeout time.Duration) Option { func WithConnectionTimeout(timeout time.Duration) Option {
return func(options *options) { return func(options *options) {
options.ConnectionTimeout = timeout options.ConnectionTimeout = timeout
@ -96,7 +96,7 @@ func WithConnectionTimeout(timeout time.Duration) Option {
// WithConnectionBackoffFactor sets the exponential backoff factor for retrying // WithConnectionBackoffFactor sets the exponential backoff factor for retrying
// failed connection attempts. // failed connection attempts.
// //
// If not used, the default is 50 milliseconds. // If not used, the default is 100 milliseconds.
func WithConnectionBackoffFactor(factor time.Duration) Option { func WithConnectionBackoffFactor(factor time.Duration) Option {
return func(options *options) { return func(options *options) {
options.ConnectionBackoffFactor = factor options.ConnectionBackoffFactor = factor
@ -113,7 +113,28 @@ func WithConnectionBackoffCap(cap time.Duration) Option {
} }
} }
// WithAttemptTimeout sets the timeout for each individual connection attempt .
//
// If not used, the default is 60 seconds.
func WithAttemptTimeout(timeout time.Duration) Option {
return func(options *options) {
options.AttemptTimeout = timeout
}
}
// WithRetryLimit sets the maximum number of connection retries.
//
// If not used, the default is 0 (unlimited retries)
func WithRetryLimit(limit uint) Option {
return func(options *options) {
options.RetryLimit = limit
}
}
// WithContext sets a global cancellation context. // WithContext sets a global cancellation context.
//
// DEPRECATED: This API is no a no-op. Users should explicitly pass a context
// if they wish to cancel their requests.
func WithContext(context context.Context) Option { func WithContext(context context.Context) Option {
return func(options *options) { return func(options *options) {
options.Context = context options.Context = context
@ -123,7 +144,8 @@ func WithContext(context context.Context) Option {
// WithContextTimeout sets the default client context timeout when no context // WithContextTimeout sets the default client context timeout when no context
// deadline is provided. // deadline is provided.
// //
// If not used, the default is 5 seconds. // DEPRECATED: This API is no a no-op. Users should explicitly pass a context
// if they wish to cancel their requests.
func WithContextTimeout(timeout time.Duration) Option { func WithContextTimeout(timeout time.Duration) Option {
return func(options *options) { return func(options *options) {
options.ContextTimeout = timeout options.ContextTimeout = timeout
@ -148,13 +170,10 @@ func New(store client.NodeStore, options ...Option) (*Driver, error) {
} }
driver.clientConfig.Dial = o.Dial driver.clientConfig.Dial = o.Dial
driver.clientConfig.AttemptTimeout = 5 * time.Second driver.clientConfig.AttemptTimeout = o.AttemptTimeout
driver.clientConfig.RetryStrategies = []strategy.Strategy{ driver.clientConfig.BackoffFactor = o.ConnectionBackoffFactor
driverConnectionRetryStrategy( driver.clientConfig.BackoffCap = o.ConnectionBackoffCap
o.ConnectionBackoffFactor, driver.clientConfig.RetryLimit = o.RetryLimit
o.ConnectionBackoffCap,
),
}
return driver, nil return driver, nil
} }
@ -163,10 +182,12 @@ func New(store client.NodeStore, options ...Option) (*Driver, error) {
type options struct { type options struct {
Log client.LogFunc Log client.LogFunc
Dial protocol.DialFunc Dial protocol.DialFunc
AttemptTimeout time.Duration
ConnectionTimeout time.Duration ConnectionTimeout time.Duration
ContextTimeout time.Duration ContextTimeout time.Duration
ConnectionBackoffFactor time.Duration ConnectionBackoffFactor time.Duration
ConnectionBackoffCap time.Duration ConnectionBackoffCap time.Duration
RetryLimit uint
Context context.Context Context context.Context
} }
@ -175,51 +196,34 @@ func defaultOptions() *options {
return &options{ return &options{
Log: client.DefaultLogFunc, Log: client.DefaultLogFunc,
Dial: client.DefaultDialFunc, Dial: client.DefaultDialFunc,
ConnectionTimeout: 15 * time.Second,
ContextTimeout: 2 * time.Second,
ConnectionBackoffFactor: 50 * time.Millisecond,
ConnectionBackoffCap: time.Second,
Context: context.Background(),
} }
} }
// Return a retry strategy with jittered exponential backoff, capped at the // A Connector represents a driver in a fixed configuration and can create any
// given amount of time. // number of equivalent Conns for use by multiple goroutines.
func driverConnectionRetryStrategy(factor, cap time.Duration) strategy.Strategy { type Connector struct {
backoff := backoff.BinaryExponential(factor) uri string
driver *Driver
return func(attempt uint) bool {
if attempt > 0 {
duration := backoff(attempt)
if duration > cap {
duration = cap
}
time.Sleep(duration)
} }
return true // Connect returns a connection to the database.
} func (c *Connector) Connect(ctx context.Context) (driver.Conn, error) {
if c.driver.context != nil {
ctx = c.driver.context
} }
// Open establishes a new connection to a SQLite database on the dqlite server. if c.driver.connectionTimeout != 0 {
// var cancel func()
// The given name must be a pure file name without any directory segment, ctx, cancel = context.WithTimeout(ctx, c.driver.connectionTimeout)
// dqlite will connect to a database with that name in its data directory.
//
// Query parameters are always valid except for "mode=memory".
//
// If this node is not the leader, or the leader is unknown an ErrNotLeader
// error is returned.
func (d *Driver) Open(uri string) (driver.Conn, error) {
ctx, cancel := context.WithTimeout(d.context, d.connectionTimeout)
defer cancel() defer cancel()
}
// TODO: generate a client ID. // TODO: generate a client ID.
connector := protocol.NewConnector(0, d.store, d.clientConfig, d.log) connector := protocol.NewConnector(0, c.driver.store, c.driver.clientConfig, c.driver.log)
conn := &Conn{ conn := &Conn{
log: d.log, log: c.driver.log,
contextTimeout: d.contextTimeout, contextTimeout: c.driver.contextTimeout,
} }
var err error var err error
@ -227,15 +231,11 @@ func (d *Driver) Open(uri string) (driver.Conn, error) {
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to create dqlite connection") return nil, errors.Wrap(err, "failed to create dqlite connection")
} }
conn.protocol.SetContextTimeout(d.contextTimeout)
conn.request.Init(4096) conn.request.Init(4096)
conn.response.Init(4096) conn.response.Init(4096)
defer conn.request.Reset() protocol.EncodeOpen(&conn.request, c.uri, 0, "volatile")
defer conn.response.Reset()
protocol.EncodeOpen(&conn.request, uri, 0, "volatile")
if err := conn.protocol.Call(ctx, &conn.request, &conn.response); err != nil { if err := conn.protocol.Call(ctx, &conn.request, &conn.response); err != nil {
conn.protocol.Close() conn.protocol.Close()
@ -251,11 +251,45 @@ func (d *Driver) Open(uri string) (driver.Conn, error) {
return conn, nil return conn, nil
} }
// Driver returns the underlying Driver of the Connector,
func (c *Connector) Driver() driver.Driver {
return c.driver
}
// OpenConnector must parse the name in the same format that Driver.Open
// parses the name parameter.
func (d *Driver) OpenConnector(name string) (driver.Connector, error) {
connector := &Connector{
uri: name,
driver: d,
}
return connector, nil
}
// Open establishes a new connection to a SQLite database on the dqlite server.
//
// The given name must be a pure file name without any directory segment,
// dqlite will connect to a database with that name in its data directory.
//
// Query parameters are always valid except for "mode=memory".
//
// If this node is not the leader, or the leader is unknown an ErrNotLeader
// error is returned.
func (d *Driver) Open(uri string) (driver.Conn, error) {
connector, err := d.OpenConnector(uri)
if err != nil {
return nil, err
}
return connector.Connect(context.Background())
}
// SetContextTimeout sets the default client timeout when no context deadline // SetContextTimeout sets the default client timeout when no context deadline
// is provided. // is provided.
func (d *Driver) SetContextTimeout(timeout time.Duration) { //
d.contextTimeout = timeout // DEPRECATED: This API is no a no-op. Users should explicitly pass a context
} // if they wish to cancel their requests.
func (d *Driver) SetContextTimeout(timeout time.Duration) {}
// ErrNoAvailableLeader is returned as root cause of Open() if there's no // ErrNoAvailableLeader is returned as root cause of Open() if there's no
// leader available in the cluster. // leader available in the cluster.
@ -275,9 +309,6 @@ type Conn struct {
// context is for the preparation of the statement, it must not store the // context is for the preparation of the statement, it must not store the
// context within the statement itself. // context within the statement itself.
func (c *Conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) { func (c *Conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
defer c.request.Reset()
defer c.response.Reset()
stmt := &Stmt{ stmt := &Stmt{
protocol: c.protocol, protocol: c.protocol,
request: &c.request, request: &c.request,
@ -306,9 +337,6 @@ func (c *Conn) Prepare(query string) (driver.Stmt, error) {
// ExecContext is an optional interface that may be implemented by a Conn. // ExecContext is an optional interface that may be implemented by a Conn.
func (c *Conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { func (c *Conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
defer c.request.Reset()
defer c.response.Reset()
protocol.EncodeExecSQL(&c.request, uint64(c.id), query, args) protocol.EncodeExecSQL(&c.request, uint64(c.id), query, args)
if err := c.protocol.Call(ctx, &c.request, &c.response); err != nil { if err := c.protocol.Call(ctx, &c.request, &c.response); err != nil {
@ -330,18 +358,14 @@ func (c *Conn) Query(query string, args []driver.Value) (driver.Rows, error) {
// QueryContext is an optional interface that may be implemented by a Conn. // QueryContext is an optional interface that may be implemented by a Conn.
func (c *Conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) { func (c *Conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
defer c.request.Reset()
protocol.EncodeQuerySQL(&c.request, uint64(c.id), query, args) protocol.EncodeQuerySQL(&c.request, uint64(c.id), query, args)
if err := c.protocol.Call(ctx, &c.request, &c.response); err != nil { if err := c.protocol.Call(ctx, &c.request, &c.response); err != nil {
c.response.Reset()
return nil, driverError(err) return nil, driverError(err)
} }
rows, err := protocol.DecodeRows(&c.response) rows, err := protocol.DecodeRows(&c.response)
if err != nil { if err != nil {
c.response.Reset()
return nil, driverError(err) return nil, driverError(err)
} }
@ -391,7 +415,15 @@ func (c *Conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, e
// //
// Deprecated: Drivers should implement ConnBeginTx instead (or additionally). // Deprecated: Drivers should implement ConnBeginTx instead (or additionally).
func (c *Conn) Begin() (driver.Tx, error) { func (c *Conn) Begin() (driver.Tx, error) {
return c.BeginTx(context.Background(), driver.TxOptions{}) ctx := context.Background()
if c.contextTimeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(context.Background(), c.contextTimeout)
defer cancel()
}
return c.BeginTx(ctx, driver.TxOptions{})
} }
// Tx is a transaction. // Tx is a transaction.
@ -401,8 +433,7 @@ type Tx struct {
// Commit the transaction. // Commit the transaction.
func (tx *Tx) Commit() error { func (tx *Tx) Commit() error {
ctx, cancel := context.WithTimeout(context.Background(), tx.conn.contextTimeout) ctx := context.Background()
defer cancel()
if _, err := tx.conn.ExecContext(ctx, "COMMIT", nil); err != nil { if _, err := tx.conn.ExecContext(ctx, "COMMIT", nil); err != nil {
return driverError(err) return driverError(err)
@ -413,8 +444,7 @@ func (tx *Tx) Commit() error {
// Rollback the transaction. // Rollback the transaction.
func (tx *Tx) Rollback() error { func (tx *Tx) Rollback() error {
ctx, cancel := context.WithTimeout(context.Background(), tx.conn.contextTimeout) ctx := context.Background()
defer cancel()
if _, err := tx.conn.ExecContext(ctx, "ROLLBACK", nil); err != nil { if _, err := tx.conn.ExecContext(ctx, "ROLLBACK", nil); err != nil {
return driverError(err) return driverError(err)
@ -436,9 +466,6 @@ type Stmt struct {
// Close closes the statement. // Close closes the statement.
func (s *Stmt) Close() error { func (s *Stmt) Close() error {
defer s.request.Reset()
defer s.response.Reset()
protocol.EncodeFinalize(s.request, s.db, s.id) protocol.EncodeFinalize(s.request, s.db, s.id)
ctx := context.Background() ctx := context.Background()
@ -464,9 +491,6 @@ func (s *Stmt) NumInput() int {
// //
// ExecContext must honor the context timeout and return when it is canceled. // ExecContext must honor the context timeout and return when it is canceled.
func (s *Stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) { func (s *Stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
defer s.request.Reset()
defer s.response.Reset()
protocol.EncodeExec(s.request, s.db, s.id, args) protocol.EncodeExec(s.request, s.db, s.id, args)
if err := s.protocol.Call(ctx, s.request, s.response); err != nil { if err := s.protocol.Call(ctx, s.request, s.response); err != nil {
@ -491,22 +515,14 @@ func (s *Stmt) Exec(args []driver.Value) (driver.Result, error) {
// //
// QueryContext must honor the context timeout and return when it is canceled. // QueryContext must honor the context timeout and return when it is canceled.
func (s *Stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) { func (s *Stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) {
defer s.request.Reset()
// FIXME: this shouldn't be needed but we have hit a few panics
// probably due to the response object not being fully reset.
s.response.Reset()
protocol.EncodeQuery(s.request, s.db, s.id, args) protocol.EncodeQuery(s.request, s.db, s.id, args)
if err := s.protocol.Call(ctx, s.request, s.response); err != nil { if err := s.protocol.Call(ctx, s.request, s.response); err != nil {
s.response.Reset()
return nil, driverError(err) return nil, driverError(err)
} }
rows, err := protocol.DecodeRows(s.response) rows, err := protocol.DecodeRows(s.response)
if err != nil { if err != nil {
s.response.Reset()
return nil, driverError(err) return nil, driverError(err)
} }

View File

@ -1,19 +0,0 @@
package bindings
/*
#include <sqlite3.h>
*/
import "C"
// Error holds information about a SQLite error.
type Error struct {
Code int
Message string
}
func (e Error) Error() string {
if e.Message != "" {
return e.Message
}
return C.GoString(C.sqlite3_errstr(C.int(e.Code)))
}

View File

@ -90,14 +90,14 @@ func init() {
func ConfigSingleThread() error { func ConfigSingleThread() error {
if rc := C.sqlite3ConfigSingleThread(); rc != 0 { if rc := C.sqlite3ConfigSingleThread(); rc != 0 {
return Error{Code: int(rc)} return protocol.Error{Message: C.GoString(C.sqlite3_errstr(rc)), Code: int(rc)}
} }
return nil return nil
} }
func ConfigMultiThread() error { func ConfigMultiThread() error {
if rc := C.sqlite3ConfigMultiThread(); rc != 0 { if rc := C.sqlite3ConfigMultiThread(); rc != 0 {
return Error{Code: int(rc)} return protocol.Error{Message: C.GoString(C.sqlite3_errstr(rc)), Code: int(rc)}
} }
return nil return nil
} }
@ -197,6 +197,14 @@ func (s *Node) Recover(cluster []protocol.NodeInfo) error {
return nil return nil
} }
// GenerateID generates a unique ID for a server.
func GenerateID(address string) uint64 {
caddress := C.CString(address)
defer C.free(unsafe.Pointer(caddress))
id := C.dqlite_generate_node_id(caddress)
return uint64(id)
}
// Extract the underlying socket from a connection. // Extract the underlying socket from a connection.
func connToSocket(conn net.Conn) (C.int, error) { func connToSocket(conn net.Conn) (C.int, error) {
file, err := conn.(fileConn).File() file, err := conn.(fileConn).File()

View File

@ -2,13 +2,14 @@ package protocol
import ( import (
"time" "time"
"github.com/Rican7/retry/strategy"
) )
// Config holds various configuration parameters for a dqlite client. // Config holds various configuration parameters for a dqlite client.
type Config struct { type Config struct {
Dial DialFunc // Network dialer. Dial DialFunc // Network dialer.
AttemptTimeout time.Duration // Timeout for each individual Dial attempt. DialTimeout time.Duration // Timeout for establishing a network connection .
RetryStrategies []strategy.Strategy // Strategies used for retrying to connect to a leader. AttemptTimeout time.Duration // Timeout for each individual attempt to probe a server's leadership.
BackoffFactor time.Duration // Exponential backoff factor for retries.
BackoffCap time.Duration // Maximum connection retry backoff value,
RetryLimit uint // Maximum number of retries, or 0 for unlimited.
} }

View File

@ -6,8 +6,11 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"time"
"github.com/Rican7/retry" "github.com/Rican7/retry"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/canonical/go-dqlite/internal/logging" "github.com/canonical/go-dqlite/internal/logging"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -27,6 +30,22 @@ type Connector struct {
// NewConnector returns a new connector that can be used by a dqlite driver to // NewConnector returns a new connector that can be used by a dqlite driver to
// create new clients connected to a leader dqlite server. // create new clients connected to a leader dqlite server.
func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) *Connector { func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) *Connector {
if config.DialTimeout == 0 {
config.DialTimeout = 10 * time.Second
}
if config.AttemptTimeout == 0 {
config.AttemptTimeout = 60 * time.Second
}
if config.BackoffFactor == 0 {
config.BackoffFactor = 100 * time.Millisecond
}
if config.BackoffCap == 0 {
config.BackoffCap = time.Second
}
connector := &Connector{ connector := &Connector{
id: id, id: id,
store: store, store: store,
@ -43,6 +62,8 @@ func NewConnector(id uint64, store NodeStore, config Config, log logging.Func) *
func (c *Connector) Connect(ctx context.Context) (*Protocol, error) { func (c *Connector) Connect(ctx context.Context) (*Protocol, error) {
var protocol *Protocol var protocol *Protocol
strategies := makeRetryStrategies(c.config.BackoffFactor, c.config.BackoffCap, c.config.RetryLimit)
// The retry strategy should be configured to retry indefinitely, until // The retry strategy should be configured to retry indefinitely, until
// the given context is done. // the given context is done.
err := retry.Retry(func(attempt uint) error { err := retry.Retry(func(attempt uint) error {
@ -61,17 +82,16 @@ func (c *Connector) Connect(ctx context.Context) (*Protocol, error) {
var err error var err error
protocol, err = c.connectAttemptAll(ctx, log) protocol, err = c.connectAttemptAll(ctx, log)
if err != nil { if err != nil {
log(logging.Debug, "connection failed err=%v", err)
return err return err
} }
return nil return nil
}, c.config.RetryStrategies...) }, strategies...)
if err != nil { if err != nil {
// The retry strategy should never give up until success or // We exhausted the number of retries allowed by the configured
// context expiration. // strategy.
panic("connect retry aborted unexpectedly") return nil, ErrNoAvailableLeader
} }
if ctx.Err() != nil { if ctx.Err() != nil {
@ -92,7 +112,7 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P
// Make an attempt for each address until we find the leader. // Make an attempt for each address until we find the leader.
for _, server := range servers { for _, server := range servers {
log := func(l logging.Level, format string, a ...interface{}) { log := func(l logging.Level, format string, a ...interface{}) {
format += fmt.Sprintf(" address=%s id=%d", server.Address, server.ID) format += fmt.Sprintf(" address=%s", server.Address)
log(l, format, a...) log(l, format, a...)
} }
@ -107,68 +127,71 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P
} }
if err != nil { if err != nil {
// This server is unavailable, try with the next target. // This server is unavailable, try with the next target.
log(logging.Debug, "server connection failed err=%v", err) log(logging.Warn, "server unavailable err=%v", err)
continue continue
} }
if protocol != nil { if protocol != nil {
// We found the leader // We found the leader
log(logging.Info, "connected") log(logging.Debug, "connected")
return protocol, nil return protocol, nil
} }
if leader == "" { if leader == "" {
// This server does not know who the current leader is, // This server does not know who the current leader is,
// try with the next target. // try with the next target.
log(logging.Warn, "no known leader")
continue continue
} }
// If we get here, it means this server reported that another // If we get here, it means this server reported that another
// server is the leader, let's close the connection to this // server is the leader, let's close the connection to this
// server and try with the suggested one. // server and try with the suggested one.
//logger = logger.With(zap.String("leader", leader)) log(logging.Debug, "connect to reported leader %s", leader)
ctx, cancel = context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()
protocol, leader, err = c.connectAttemptOne(ctx, leader, version) protocol, leader, err = c.connectAttemptOne(ctx, leader, version)
if err != nil { if err != nil {
// The leader reported by the previous server is // The leader reported by the previous server is
// unavailable, try with the next target. // unavailable, try with the next target.
//logger.Info("leader server connection failed", zap.String("err", err.Error())) log(logging.Warn, "reported leader unavailable err=%v", err)
continue continue
} }
if protocol == nil { if protocol == nil {
// The leader reported by the target server does not consider itself // The leader reported by the target server does not consider itself
// the leader, try with the next target. // the leader, try with the next target.
//logger.Info("reported leader server is not the leader") log(logging.Warn, "reported leader server is not the leader")
continue continue
} }
log(logging.Info, "connected") log(logging.Debug, "connected")
return protocol, nil return protocol, nil
} }
return nil, ErrNoAvailableLeader return nil, ErrNoAvailableLeader
} }
// Connect establishes a connection with a dqlite node. // Perform the initial handshake using the given protocol version.
func Connect(ctx context.Context, dial DialFunc, address string, version uint64) (*Protocol, error) { func Handshake(ctx context.Context, conn net.Conn, version uint64) (*Protocol, error) {
// Establish the connection.
conn, err := dial(ctx, address)
if err != nil {
return nil, errors.Wrap(err, "failed to establish network connection")
}
// Latest protocol version. // Latest protocol version.
protocol := make([]byte, 8) protocol := make([]byte, 8)
binary.LittleEndian.PutUint64(protocol, version) binary.LittleEndian.PutUint64(protocol, version)
// Honor the ctx deadline, if present.
if deadline, ok := ctx.Deadline(); ok {
conn.SetDeadline(deadline)
defer conn.SetDeadline(time.Time{})
}
// Perform the protocol handshake. // Perform the protocol handshake.
n, err := conn.Write(protocol) n, err := conn.Write(protocol)
if err != nil { if err != nil {
conn.Close()
return nil, errors.Wrap(err, "failed to send handshake") return nil, errors.Wrap(err, "failed to send handshake")
} }
if n != 8 { if n != 8 {
conn.Close()
return nil, errors.Wrap(io.ErrShortWrite, "failed to send handshake") return nil, errors.Wrap(io.ErrShortWrite, "failed to send handshake")
} }
return NewProtocol(version, conn), nil return newProtocol(version, conn), nil
} }
// Connect to the given dqlite server and check if it's the leader. // Connect to the given dqlite server and check if it's the leader.
@ -181,8 +204,18 @@ func Connect(ctx context.Context, dial DialFunc, address string, version uint64)
// - Target is the leader: -> server, "", nil // - Target is the leader: -> server, "", nil
// //
func (c *Connector) connectAttemptOne(ctx context.Context, address string, version uint64) (*Protocol, string, error) { func (c *Connector) connectAttemptOne(ctx context.Context, address string, version uint64) (*Protocol, string, error) {
protocol, err := Connect(ctx, c.config.Dial, address, version) dialCtx, cancel := context.WithTimeout(ctx, c.config.DialTimeout)
defer cancel()
// Establish the connection.
conn, err := c.config.Dial(dialCtx, address)
if err != nil { if err != nil {
return nil, "", errors.Wrap(err, "failed to establish network connection")
}
protocol, err := Handshake(ctx, conn, version)
if err != nil {
conn.Close()
return nil, "", err return nil, "", err
} }
@ -219,8 +252,8 @@ func (c *Connector) connectAttemptOne(ctx context.Context, address string, versi
return nil, "", nil return nil, "", nil
case address: case address:
// This server is the leader, register ourselves and return. // This server is the leader, register ourselves and return.
request.Reset() request.reset()
response.Reset() response.reset()
EncodeClient(&request, c.id) EncodeClient(&request, c.id)
@ -247,4 +280,33 @@ func (c *Connector) connectAttemptOne(ctx context.Context, address string, versi
} }
} }
// Return a retry strategy with exponential backoff, capped at the given amount
// of time and possibly with a maximum number of retries.
func makeRetryStrategies(factor, cap time.Duration, limit uint) []strategy.Strategy {
backoff := backoff.BinaryExponential(factor)
strategies := []strategy.Strategy{}
if limit > 0 {
strategies = append(strategies, strategy.Limit(limit))
}
strategies = append(strategies,
func(attempt uint) bool {
if attempt > 0 {
duration := backoff(attempt)
// Duration might be negative in case of integer overflow.
if duration > cap || duration <= 0 {
duration = cap
}
time.Sleep(duration)
}
return true
},
)
return strategies
}
var errBadProtocol = fmt.Errorf("bad protocol") var errBadProtocol = fmt.Errorf("bad protocol")

View File

@ -2,6 +2,7 @@ package protocol
import ( import (
"context" "context"
"crypto/tls"
"net" "net"
) )
@ -18,3 +19,15 @@ func UnixDial(ctx context.Context, address string) (net.Conn, error) {
dialer := net.Dialer{} dialer := net.Dialer{}
return dialer.DialContext(ctx, "unix", address) return dialer.DialContext(ctx, "unix", address)
} }
// TLSCipherSuites are the cipher suites by the go-dqlite TLS helpers.
var TLSCipherSuites = []uint16{
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
}

View File

@ -27,3 +27,13 @@ func (e ErrRequest) Error() string {
// ErrRowsPart is returned when the first batch of a multi-response result // ErrRowsPart is returned when the first batch of a multi-response result
// batch is done. // batch is done.
var ErrRowsPart = fmt.Errorf("not all rows were returned in this response") var ErrRowsPart = fmt.Errorf("not all rows were returned in this response")
// Error holds information about a SQLite error.
type Error struct {
Code int
Message string
}
func (e Error) Error() string {
return e.Message
}

View File

@ -26,24 +26,23 @@ type Message struct {
flags uint8 flags uint8
extra uint16 extra uint16
header []byte // Statically allocated header buffer header []byte // Statically allocated header buffer
body1 buffer // Statically allocated body data, using bytes body buffer // Message body data.
body2 buffer // Dynamically allocated body data
} }
// Init initializes the message using the given size of the statically // Init initializes the message using the given initial size for the data
// allocated buffer (i.e. a buffer which is re-used across requests or // buffer, which is re-used across requests or responses encoded or decoded
// responses encoded or decoded using this message object). // using this message object.
func (m *Message) Init(staticSize int) { func (m *Message) Init(initialBufferSize int) {
if (staticSize % messageWordSize) != 0 { if (initialBufferSize % messageWordSize) != 0 {
panic("static size is not aligned to word boundary") panic("initial buffer size is not aligned to word boundary")
} }
m.header = make([]byte, messageHeaderSize) m.header = make([]byte, messageHeaderSize)
m.body1.Bytes = make([]byte, staticSize) m.body.Bytes = make([]byte, initialBufferSize)
m.Reset() m.reset()
} }
// Reset the state of the message so it can be used to encode or decode again. // Reset the state of the message so it can be used to encode or decode again.
func (m *Message) Reset() { func (m *Message) reset() {
m.words = 0 m.words = 0
m.mtype = 0 m.mtype = 0
m.flags = 0 m.flags = 0
@ -51,9 +50,7 @@ func (m *Message) Reset() {
for i := 0; i < messageHeaderSize; i++ { for i := 0; i < messageHeaderSize; i++ {
m.header[i] = 0 m.header[i] = 0
} }
m.body1.Offset = 0 m.body.Offset = 0
m.body2.Bytes = nil
m.body2.Offset = 0
} }
// Append a byte slice to the message. // Append a byte slice to the message.
@ -232,11 +229,11 @@ func (m *Message) putNamedValues(values NamedValues) {
// Finalize the message by setting the message type and the number // Finalize the message by setting the message type and the number
// of words in the body (calculated from the body size). // of words in the body (calculated from the body size).
func (m *Message) putHeader(mtype uint8) { func (m *Message) putHeader(mtype uint8) {
if m.body1.Offset <= 0 { if m.body.Offset <= 0 {
panic("static offset is not positive") panic("static offset is not positive")
} }
if (m.body1.Offset % messageWordSize) != 0 { if (m.body.Offset % messageWordSize) != 0 {
panic("static body is not aligned") panic("static body is not aligned")
} }
@ -244,22 +241,7 @@ func (m *Message) putHeader(mtype uint8) {
m.flags = 0 m.flags = 0
m.extra = 0 m.extra = 0
m.words = uint32(m.body1.Offset) / messageWordSize m.words = uint32(m.body.Offset) / messageWordSize
if m.body2.Bytes == nil {
m.finalize()
return
}
if m.body2.Offset <= 0 {
panic("dynamic offset is not positive")
}
if (m.body2.Offset % messageWordSize) != 0 {
panic("dynamic body is not aligned")
}
m.words += uint32(m.body2.Offset) / messageWordSize
m.finalize() m.finalize()
} }
@ -276,27 +258,14 @@ func (m *Message) finalize() {
} }
func (m *Message) bufferForPut(size int) *buffer { func (m *Message) bufferForPut(size int) *buffer {
if m.body2.Bytes != nil { for (m.body.Offset + size) > len(m.body.Bytes) {
if (m.body2.Offset + size) > len(m.body2.Bytes) { // Grow message buffer.
// Grow body2. bytes := make([]byte, len(m.body.Bytes)*2)
// copy(bytes, m.body.Bytes)
// TODO: find a good grow strategy. m.body.Bytes = bytes
bytes := make([]byte, m.body2.Offset+size)
copy(bytes, m.body2.Bytes)
m.body2.Bytes = bytes
} }
return &m.body2 return &m.body
}
if (m.body1.Offset + size) > len(m.body1.Bytes) {
m.body2.Bytes = make([]byte, size)
m.body2.Offset = 0
return &m.body2
}
return &m.body1
} }
// Return the message type and its flags. // Return the message type and its flags.
@ -310,31 +279,6 @@ func (m *Message) getString() string {
index := bytes.IndexByte(b.Bytes[b.Offset:], 0) index := bytes.IndexByte(b.Bytes[b.Offset:], 0)
if index == -1 { if index == -1 {
// Check if the string overflows in the dynamic buffer.
if b == &m.body1 && m.body2.Bytes != nil {
// Assert that this is the first read of the dynamic buffer.
if m.body2.Offset != 0 {
panic("static buffer read after dynamic buffer one")
}
index = bytes.IndexByte(m.body2.Bytes[0:], 0)
if index != -1 {
// We found the trailing part of the string.
data := b.Bytes[b.Offset:]
data = append(data, m.body2.Bytes[0:index]...)
index++
if trailing := index % messageWordSize; trailing != 0 {
// Account for padding, moving index to the next word boundary.
index += messageWordSize - trailing
}
m.body1.Offset = len(m.body1.Bytes)
m.body2.Advance(index)
return string(data)
}
}
panic("no string found") panic("no string found")
} }
s := string(b.Bytes[b.Offset : b.Offset+index]) s := string(b.Bytes[b.Offset : b.Offset+index])
@ -465,31 +409,23 @@ func (m *Message) getFiles() Files {
func (m *Message) hasBeenConsumed() bool { func (m *Message) hasBeenConsumed() bool {
size := int(m.words * messageWordSize) size := int(m.words * messageWordSize)
return (m.body1.Offset == size || m.body1.Offset == len(m.body1.Bytes)) && return m.body.Offset == size
m.body1.Offset+m.body2.Offset == size
} }
func (m *Message) lastByte() byte { func (m *Message) lastByte() byte {
size := int(m.words * messageWordSize) size := int(m.words * messageWordSize)
if size > len(m.body1.Bytes) { return m.body.Bytes[size-1]
size = size - m.body1.Offset
return m.body2.Bytes[size-1]
}
return m.body1.Bytes[size-1]
} }
func (m *Message) bufferForGet() *buffer { func (m *Message) bufferForGet() *buffer {
size := int(m.words * messageWordSize) size := int(m.words * messageWordSize)
if m.body1.Offset == size || m.body1.Offset == len(m.body1.Bytes) {
// The static body has been exahusted, use the dynamic one. // The static body has been exahusted, use the dynamic one.
if m.body1.Offset+m.body2.Offset == size { if m.body.Offset == size {
err := fmt.Errorf("short message: type=%d words=%d off=%d", m.mtype, m.words, m.body1.Offset) err := fmt.Errorf("short message: type=%d words=%d off=%d", m.mtype, m.words, m.body.Offset)
panic(err) panic(err)
} }
return &m.body2
}
return &m.body1 return &m.body
} }
// Result holds the result of a statement. // Result holds the result of a statement.
@ -639,7 +575,7 @@ func (r *Rows) Close() error {
err = fmt.Errorf("unexpected end of message") err = fmt.Errorf("unexpected end of message")
} }
} }
r.message.Reset() r.message.reset()
return err return err
} }
@ -664,7 +600,7 @@ func (f *Files) Next() (string, []byte) {
} }
func (f *Files) Close() { func (f *Files) Close() {
f.message.Reset() f.message.reset()
} }
const ( const (

View File

@ -15,29 +15,21 @@ import (
type Protocol struct { type Protocol struct {
version uint64 // Protocol version version uint64 // Protocol version
conn net.Conn // Underlying network connection. conn net.Conn // Underlying network connection.
contextTimeout time.Duration // Default context timeout.
closeCh chan struct{} // Stops the heartbeat when the connection gets closed closeCh chan struct{} // Stops the heartbeat when the connection gets closed
mu sync.Mutex // Serialize requests mu sync.Mutex // Serialize requests
netErr error // A network error occurred netErr error // A network error occurred
} }
func NewProtocol(version uint64, conn net.Conn) *Protocol { func newProtocol(version uint64, conn net.Conn) *Protocol {
protocol := &Protocol{ protocol := &Protocol{
version: version, version: version,
conn: conn, conn: conn,
closeCh: make(chan struct{}), closeCh: make(chan struct{}),
contextTimeout: 5 * time.Second,
} }
return protocol return protocol
} }
// SetContextTimeout sets the default context timeout when no deadline is
// provided.
func (p *Protocol) SetContextTimeout(timeout time.Duration) {
p.contextTimeout = timeout
}
// Call invokes a dqlite RPC, sending a request message and receiving a // Call invokes a dqlite RPC, sending a request message and receiving a
// response message. // response message.
func (p *Protocol) Call(ctx context.Context, request, response *Message) (err error) { func (p *Protocol) Call(ctx context.Context, request, response *Message) (err error) {
@ -50,21 +42,22 @@ func (p *Protocol) Call(ctx context.Context, request, response *Message) (err er
return p.netErr return p.netErr
} }
// Honor the ctx deadline, if present, or use a default. var budget time.Duration
deadline, ok := ctx.Deadline()
if !ok { // Honor the ctx deadline, if present.
deadline = time.Now().Add(p.contextTimeout) if deadline, ok := ctx.Deadline(); ok {
p.conn.SetDeadline(deadline)
budget = time.Until(deadline)
defer p.conn.SetDeadline(time.Time{})
} }
p.conn.SetDeadline(deadline)
if err = p.send(request); err != nil { if err = p.send(request); err != nil {
err = errors.Wrap(err, "failed to send request") err = errors.Wrapf(err, "send request (budget=%s)", budget)
goto err goto err
} }
if err = p.recv(response); err != nil { if err = p.recv(response); err != nil {
err = errors.Wrap(err, "failed to receive response") err = errors.Wrapf(err, "receive response (budget=%s)", budget)
goto err goto err
} }
@ -91,14 +84,11 @@ func (p *Protocol) Interrupt(ctx context.Context, request *Message, response *Me
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
// Honor the ctx deadline, if present, or use a default. // Honor the ctx deadline, if present.
deadline, ok := ctx.Deadline() if deadline, ok := ctx.Deadline(); ok {
if !ok {
deadline = time.Now().Add(2 * time.Second)
}
p.conn.SetDeadline(deadline) p.conn.SetDeadline(deadline)
defer p.conn.SetDeadline(time.Time{})
defer request.Reset() }
EncodeInterrupt(request, 0) EncodeInterrupt(request, 0)
@ -108,12 +98,10 @@ func (p *Protocol) Interrupt(ctx context.Context, request *Message, response *Me
for { for {
if err := p.recv(response); err != nil { if err := p.recv(response); err != nil {
response.Reset()
return errors.Wrap(err, "failed to receive response") return errors.Wrap(err, "failed to receive response")
} }
mtype, _ := response.getHeader() mtype, _ := response.getHeader()
response.Reset()
if mtype == ResponseEmpty { if mtype == ResponseEmpty {
break break
@ -155,7 +143,7 @@ func (p *Protocol) sendHeader(req *Message) error {
} }
func (p *Protocol) sendBody(req *Message) error { func (p *Protocol) sendBody(req *Message) error {
buf := req.body1.Bytes[:req.body1.Offset] buf := req.body.Bytes[:req.body.Offset]
n, err := p.conn.Write(buf) n, err := p.conn.Write(buf)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to send static body") return errors.Wrap(err, "failed to send static body")
@ -165,24 +153,12 @@ func (p *Protocol) sendBody(req *Message) error {
return errors.Wrap(io.ErrShortWrite, "failed to write body") return errors.Wrap(io.ErrShortWrite, "failed to write body")
} }
if req.body2.Bytes == nil {
return nil
}
buf = req.body2.Bytes[:req.body2.Offset]
n, err = p.conn.Write(buf)
if err != nil {
return errors.Wrap(err, "failed to send dynamic body")
}
if n != len(buf) {
return errors.Wrap(io.ErrShortWrite, "failed to write body")
}
return nil return nil
} }
func (p *Protocol) recv(res *Message) error { func (p *Protocol) recv(res *Message) error {
res.reset()
if err := p.recvHeader(res); err != nil { if err := p.recvHeader(res); err != nil {
return errors.Wrap(err, "failed to receive header") return errors.Wrap(err, "failed to receive header")
} }
@ -209,30 +185,19 @@ func (p *Protocol) recvHeader(res *Message) error {
func (p *Protocol) recvBody(res *Message) error { func (p *Protocol) recvBody(res *Message) error {
n := int(res.words) * messageWordSize n := int(res.words) * messageWordSize
n1 := n
n2 := 0
if n1 > len(res.body1.Bytes) { for n > len(res.body.Bytes) {
// We need to allocate the dynamic buffer. // Grow message buffer.
n1 = len(res.body1.Bytes) bytes := make([]byte, len(res.body.Bytes)*2)
n2 = n - n1 res.body.Bytes = bytes
} }
buf := res.body1.Bytes[:n1] buf := res.body.Bytes[:n]
if err := p.recvPeek(buf); err != nil { if err := p.recvPeek(buf); err != nil {
return errors.Wrap(err, "failed to read body") return errors.Wrap(err, "failed to read body")
} }
if n2 > 0 {
res.body2.Bytes = make([]byte, n2)
res.body2.Offset = 0
buf = res.body2.Bytes
if err := p.recvPeek(buf); err != nil {
return errors.Wrap(err, "failed to read body")
}
}
return nil return nil
} }

View File

@ -7,6 +7,7 @@ package protocol
// EncodeLeader encodes a Leader request. // EncodeLeader encodes a Leader request.
func EncodeLeader(request *Message) { func EncodeLeader(request *Message) {
request.reset()
request.putUint64(0) request.putUint64(0)
request.putHeader(RequestLeader) request.putHeader(RequestLeader)
@ -14,6 +15,7 @@ func EncodeLeader(request *Message) {
// EncodeClient encodes a Client request. // EncodeClient encodes a Client request.
func EncodeClient(request *Message, id uint64) { func EncodeClient(request *Message, id uint64) {
request.reset()
request.putUint64(id) request.putUint64(id)
request.putHeader(RequestClient) request.putHeader(RequestClient)
@ -21,6 +23,7 @@ func EncodeClient(request *Message, id uint64) {
// EncodeHeartbeat encodes a Heartbeat request. // EncodeHeartbeat encodes a Heartbeat request.
func EncodeHeartbeat(request *Message, timestamp uint64) { func EncodeHeartbeat(request *Message, timestamp uint64) {
request.reset()
request.putUint64(timestamp) request.putUint64(timestamp)
request.putHeader(RequestHeartbeat) request.putHeader(RequestHeartbeat)
@ -28,6 +31,7 @@ func EncodeHeartbeat(request *Message, timestamp uint64) {
// EncodeOpen encodes a Open request. // EncodeOpen encodes a Open request.
func EncodeOpen(request *Message, name string, flags uint64, vfs string) { func EncodeOpen(request *Message, name string, flags uint64, vfs string) {
request.reset()
request.putString(name) request.putString(name)
request.putUint64(flags) request.putUint64(flags)
request.putString(vfs) request.putString(vfs)
@ -37,6 +41,7 @@ func EncodeOpen(request *Message, name string, flags uint64, vfs string) {
// EncodePrepare encodes a Prepare request. // EncodePrepare encodes a Prepare request.
func EncodePrepare(request *Message, db uint64, sql string) { func EncodePrepare(request *Message, db uint64, sql string) {
request.reset()
request.putUint64(db) request.putUint64(db)
request.putString(sql) request.putString(sql)
@ -45,6 +50,7 @@ func EncodePrepare(request *Message, db uint64, sql string) {
// EncodeExec encodes a Exec request. // EncodeExec encodes a Exec request.
func EncodeExec(request *Message, db uint32, stmt uint32, values NamedValues) { func EncodeExec(request *Message, db uint32, stmt uint32, values NamedValues) {
request.reset()
request.putUint32(db) request.putUint32(db)
request.putUint32(stmt) request.putUint32(stmt)
request.putNamedValues(values) request.putNamedValues(values)
@ -54,6 +60,7 @@ func EncodeExec(request *Message, db uint32, stmt uint32, values NamedValues) {
// EncodeQuery encodes a Query request. // EncodeQuery encodes a Query request.
func EncodeQuery(request *Message, db uint32, stmt uint32, values NamedValues) { func EncodeQuery(request *Message, db uint32, stmt uint32, values NamedValues) {
request.reset()
request.putUint32(db) request.putUint32(db)
request.putUint32(stmt) request.putUint32(stmt)
request.putNamedValues(values) request.putNamedValues(values)
@ -63,6 +70,7 @@ func EncodeQuery(request *Message, db uint32, stmt uint32, values NamedValues) {
// EncodeFinalize encodes a Finalize request. // EncodeFinalize encodes a Finalize request.
func EncodeFinalize(request *Message, db uint32, stmt uint32) { func EncodeFinalize(request *Message, db uint32, stmt uint32) {
request.reset()
request.putUint32(db) request.putUint32(db)
request.putUint32(stmt) request.putUint32(stmt)
@ -71,6 +79,7 @@ func EncodeFinalize(request *Message, db uint32, stmt uint32) {
// EncodeExecSQL encodes a ExecSQL request. // EncodeExecSQL encodes a ExecSQL request.
func EncodeExecSQL(request *Message, db uint64, sql string, values NamedValues) { func EncodeExecSQL(request *Message, db uint64, sql string, values NamedValues) {
request.reset()
request.putUint64(db) request.putUint64(db)
request.putString(sql) request.putString(sql)
request.putNamedValues(values) request.putNamedValues(values)
@ -80,6 +89,7 @@ func EncodeExecSQL(request *Message, db uint64, sql string, values NamedValues)
// EncodeQuerySQL encodes a QuerySQL request. // EncodeQuerySQL encodes a QuerySQL request.
func EncodeQuerySQL(request *Message, db uint64, sql string, values NamedValues) { func EncodeQuerySQL(request *Message, db uint64, sql string, values NamedValues) {
request.reset()
request.putUint64(db) request.putUint64(db)
request.putString(sql) request.putString(sql)
request.putNamedValues(values) request.putNamedValues(values)
@ -89,6 +99,7 @@ func EncodeQuerySQL(request *Message, db uint64, sql string, values NamedValues)
// EncodeInterrupt encodes a Interrupt request. // EncodeInterrupt encodes a Interrupt request.
func EncodeInterrupt(request *Message, db uint64) { func EncodeInterrupt(request *Message, db uint64) {
request.reset()
request.putUint64(db) request.putUint64(db)
request.putHeader(RequestInterrupt) request.putHeader(RequestInterrupt)
@ -96,6 +107,7 @@ func EncodeInterrupt(request *Message, db uint64) {
// EncodeAdd encodes a Add request. // EncodeAdd encodes a Add request.
func EncodeAdd(request *Message, id uint64, address string) { func EncodeAdd(request *Message, id uint64, address string) {
request.reset()
request.putUint64(id) request.putUint64(id)
request.putString(address) request.putString(address)
@ -104,6 +116,7 @@ func EncodeAdd(request *Message, id uint64, address string) {
// EncodeAssign encodes a Assign request. // EncodeAssign encodes a Assign request.
func EncodeAssign(request *Message, id uint64, role uint64) { func EncodeAssign(request *Message, id uint64, role uint64) {
request.reset()
request.putUint64(id) request.putUint64(id)
request.putUint64(role) request.putUint64(role)
@ -112,6 +125,7 @@ func EncodeAssign(request *Message, id uint64, role uint64) {
// EncodeRemove encodes a Remove request. // EncodeRemove encodes a Remove request.
func EncodeRemove(request *Message, id uint64) { func EncodeRemove(request *Message, id uint64) {
request.reset()
request.putUint64(id) request.putUint64(id)
request.putHeader(RequestRemove) request.putHeader(RequestRemove)
@ -119,6 +133,7 @@ func EncodeRemove(request *Message, id uint64) {
// EncodeDump encodes a Dump request. // EncodeDump encodes a Dump request.
func EncodeDump(request *Message, name string) { func EncodeDump(request *Message, name string) {
request.reset()
request.putString(name) request.putString(name)
request.putHeader(RequestDump) request.putHeader(RequestDump)
@ -126,6 +141,7 @@ func EncodeDump(request *Message, name string) {
// EncodeCluster encodes a Cluster request. // EncodeCluster encodes a Cluster request.
func EncodeCluster(request *Message, format uint64) { func EncodeCluster(request *Message, format uint64) {
request.reset()
request.putUint64(format) request.putUint64(format)
request.putHeader(RequestCluster) request.putHeader(RequestCluster)
@ -133,6 +149,7 @@ func EncodeCluster(request *Message, format uint64) {
// EncodeTransfer encodes a Transfer request. // EncodeTransfer encodes a Transfer request.
func EncodeTransfer(request *Message, id uint64) { func EncodeTransfer(request *Message, id uint64) {
request.reset()
request.putUint64(id) request.putUint64(id)
request.putHeader(RequestTransfer) request.putHeader(RequestTransfer)

View File

@ -54,6 +54,7 @@ if [ "$entity" = "--request" ]; then
// Encode${cmd} encodes a $cmd request. // Encode${cmd} encodes a $cmd request.
func Encode${cmd}(request *Message${args}) { func Encode${cmd}(request *Message${args}) {
request.reset()
EOF EOF
for i in "${@}" for i in "${@}"

View File

@ -118,6 +118,16 @@ func (s *Node) Close() error {
return nil return nil
} }
// BootstrapID is a magic ID that should be used for the fist node in a
// cluster. Alternatively ID 1 can be used as well.
const BootstrapID = 0x2dc171858c3155be
// GenerateID generates a unique ID for a new node, based on a hash of its
// address and the current time.
func GenerateID(address string) uint64 {
return bindings.GenerateID(address)
}
// Create a options object with sane defaults. // Create a options object with sane defaults.
func defaultOptions() *options { func defaultOptions() *options {
return &options{ return &options{

4
vendor/modules.txt vendored
View File

@ -135,7 +135,7 @@ github.com/blang/semver
github.com/bronze1man/goStrongswanVici github.com/bronze1man/goStrongswanVici
# github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 # github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23
github.com/buger/jsonparser github.com/buger/jsonparser
# github.com/canonical/go-dqlite v1.3.0 # github.com/canonical/go-dqlite v1.5.1
github.com/canonical/go-dqlite github.com/canonical/go-dqlite
github.com/canonical/go-dqlite/client github.com/canonical/go-dqlite/client
github.com/canonical/go-dqlite/driver github.com/canonical/go-dqlite/driver
@ -730,7 +730,7 @@ github.com/rancher/helm-controller/pkg/generated/informers/externalversions/helm
github.com/rancher/helm-controller/pkg/generated/informers/externalversions/internalinterfaces github.com/rancher/helm-controller/pkg/generated/informers/externalversions/internalinterfaces
github.com/rancher/helm-controller/pkg/generated/listers/helm.cattle.io/v1 github.com/rancher/helm-controller/pkg/generated/listers/helm.cattle.io/v1
github.com/rancher/helm-controller/pkg/helm github.com/rancher/helm-controller/pkg/helm
# github.com/rancher/kine v0.3.6-0.20200422224205-0a0f5b924129 # github.com/rancher/kine v0.4.0
github.com/rancher/kine/pkg/broadcaster github.com/rancher/kine/pkg/broadcaster
github.com/rancher/kine/pkg/client github.com/rancher/kine/pkg/client
github.com/rancher/kine/pkg/drivers/dqlite github.com/rancher/kine/pkg/drivers/dqlite