Merge pull request #1316 from chrisfleming/chrisfleming-dqlite-update

Update dqlite to 1.3.1-r1
pull/1323/head
Erik Wilson 2020-01-20 16:03:32 -07:00 committed by GitHub
commit d8cf1edf65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 247 additions and 83 deletions

View File

@ -19,7 +19,7 @@ RUN if [ "${ARCH}" == "amd64" ]; then \
ARG DQLITE=true
ENV DQLITE $DQLITE
COPY --from=rancher/dqlite-build:v1.2.1-r3 /dist/artifacts /usr/src/
COPY --from=rancher/dqlite-build:v1.3.1-r1 /dist/artifacts /usr/src/
RUN if [ "$DQLITE" = true ]; then \
tar xzf /usr/src/dqlite.tgz -C / && \
apk add --allow-untrusted /usr/local/packages/*.apk \

2
go.mod
View File

@ -64,7 +64,7 @@ require (
github.com/bhendo/go-powershell v0.0.0-20190719160123-219e7fb4e41e // indirect
github.com/bronze1man/goStrongswanVici v0.0.0-20190828090544-27d02f80ba40 // indirect
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect
github.com/canonical/go-dqlite v1.2.0
github.com/canonical/go-dqlite v1.3.0
github.com/containerd/cgroups v0.0.0-00010101000000-000000000000 // indirect
github.com/containerd/containerd v1.3.0-beta.2.0.20190828155532-0293cbd26c69
github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 // indirect

8
go.sum
View File

@ -116,6 +116,8 @@ github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7
github.com/caddyserver/caddy v1.0.3/go.mod h1:G+ouvOY32gENkJC+jhgl62TyhvqEsFaDiZ4uw0RzP1E=
github.com/canonical/go-dqlite v1.2.0 h1:TCsNV/mAmPy7PQa4jIQMoiXbCUVqYTSv6OJSpi/qdX8=
github.com/canonical/go-dqlite v1.2.0/go.mod h1:wp00vfMvPYgNCyxcPdHB5XExmDoCGoPUGymloAQT17Y=
github.com/canonical/go-dqlite v1.3.0 h1:c+7eGZfh0K7yCmGrBkNRGZdY8R8+2jSSkz6Zr3YCjJE=
github.com/canonical/go-dqlite v1.3.0/go.mod h1:wp00vfMvPYgNCyxcPdHB5XExmDoCGoPUGymloAQT17Y=
github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@ -713,12 +715,6 @@ github.com/rancher/dynamiclistener v0.2.0 h1:KucYwJXVVGhZ/NndfMCeQoCafT/VN7kvqSG
github.com/rancher/dynamiclistener v0.2.0/go.mod h1:fs/dxyNcB3YT6W9fVz4bDGfhmSQS17QQup6BIcGF++s=
github.com/rancher/flannel v0.11.0-k3s.1 h1:mIwnfWDafjzQgFkZeJ1AkFrrAT3EdBaA1giE0eLJKo8=
github.com/rancher/flannel v0.11.0-k3s.1/go.mod h1:Hn4ZV+eq0LhLZP63xZnxdGwXEoRSxs5sxELxu27M3UA=
github.com/rancher/helm-controller v0.3.0 h1:sYRpOiJc4+NmSEkft3lR2pwaEPOrPzZOTo2UjFnVF4I=
github.com/rancher/helm-controller v0.3.0/go.mod h1:194LHuZRrxcD82bG1rJtOWsw98U4JbPhDWqvL7l3PAw=
github.com/rancher/helm-controller v0.4.0 h1:VO7TqMsMWh3TJK7U38LLht1uHiDy0fSYWSywT0bE5lY=
github.com/rancher/helm-controller v0.4.0/go.mod h1:194LHuZRrxcD82bG1rJtOWsw98U4JbPhDWqvL7l3PAw=
github.com/rancher/helm-controller v0.4.1-0.20191219210749-b81cad6ffd60 h1:c4UL7AcwFGoZIit6EoVS9S1ylj4IGhPCtQM23iFXBzA=
github.com/rancher/helm-controller v0.4.1-0.20191219210749-b81cad6ffd60/go.mod h1:194LHuZRrxcD82bG1rJtOWsw98U4JbPhDWqvL7l3PAw=
github.com/rancher/helm-controller v0.4.1-0.20191223195946-ae918063bc54 h1:S/IAloMp6MYYtoHKfJUS5M3Wv0CU+5Ixe1nnSv2pQUU=
github.com/rancher/helm-controller v0.4.1-0.20191223195946-ae918063bc54/go.mod h1:194LHuZRrxcD82bG1rJtOWsw98U4JbPhDWqvL7l3PAw=
github.com/rancher/kine v0.3.2 h1:2kP48ojBWVoZ6vlzixc9jc9uKRk7Yn1a7kWoOsJi7Sg=

View File

@ -1,7 +1,7 @@
;;; Directory Local Variables
;;; For more information see (info "(emacs) Directory Variables")
((go-mode
. ((go-test-args . "-tags libsqlite3 -timeout 10s")
. ((go-test-args . "-tags libsqlite3 -timeout 60s")
(eval
. (set
(make-local-variable 'flycheck-go-build-tags)

View File

@ -1,4 +1,4 @@
dist: xenial
dist: bionic
language: go
addons:

View File

@ -107,7 +107,7 @@ func (c *Client) Cluster(ctx context.Context) ([]NodeInfo, error) {
response := protocol.Message{}
response.Init(512)
protocol.EncodeCluster(&request)
protocol.EncodeCluster(&request, protocol.ClusterFormatV1)
if err := c.protocol.Call(ctx, &request, &response); err != nil {
return nil, errors.Wrap(err, "failed to send Cluster request")
@ -163,24 +163,86 @@ func (c *Client) Dump(ctx context.Context, dbname string) ([]File, error) {
}
// Add a node to a cluster.
//
// The new node will have the role specified in node.Role. Note that if the
// desired role is Voter, the node being added must be online, since it will be
// granted voting rights only once it catches up with the leader's log.
func (c *Client) Add(ctx context.Context, node NodeInfo) error {
request := protocol.Message{}
request.Init(4096)
response := protocol.Message{}
request.Init(4096)
response.Init(4096)
protocol.EncodeJoin(&request, node.ID, node.Address)
protocol.EncodeAdd(&request, node.ID, node.Address)
if err := c.protocol.Call(ctx, &request, &response); err != nil {
return err
}
protocol.EncodePromote(&request, node.ID)
if err := protocol.DecodeEmpty(&response); err != nil {
return err
}
// If the desired role is spare, there's nothing to do, since all newly
// added nodes have the spare role.
if node.Role == Spare {
return nil
}
return c.Assign(ctx, node.ID, node.Role)
}
// Assign a role to a node.
//
// Possible roles are:
//
// - Voter: the node will replicate data and participate in quorum.
// - StandBy: the node will replicate data but won't participate in quorum.
// - Spare: the node won't replicate data and won't participate in quorum.
//
// If the target node does not exist or has already the desired role, an error
// is returned.
func (c *Client) Assign(ctx context.Context, id uint64, role NodeRole) error {
request := protocol.Message{}
response := protocol.Message{}
request.Init(4096)
response.Init(4096)
protocol.EncodeAssign(&request, id, uint64(role))
if err := c.protocol.Call(ctx, &request, &response); err != nil {
return err
}
if err := protocol.DecodeEmpty(&response); err != nil {
return err
}
return nil
}
// Transfer leadership from the current leader to another node.
//
// This must be invoked one client connected to the current leader.
func (c *Client) Transfer(ctx context.Context, id uint64) error {
request := protocol.Message{}
response := protocol.Message{}
request.Init(4096)
response.Init(4096)
protocol.EncodeTransfer(&request, id)
if err := c.protocol.Call(ctx, &request, &response); err != nil {
return err
}
if err := protocol.DecodeEmpty(&response); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,12 @@
package client
import (
"github.com/canonical/go-dqlite/internal/protocol"
)
// Node roles
const (
Voter = protocol.Voter
StandBy = protocol.StandBy
Spare = protocol.Spare
)

View File

@ -15,6 +15,9 @@ import (
// dqlite nodes that it can dial in order to find a leader dqlite node to use.
type NodeStore = protocol.NodeStore
// NodeRole identifies the role of a node.
type NodeRole = protocol.NodeRole
// NodeInfo holds information about a single server.
type NodeInfo = protocol.NodeInfo
@ -30,6 +33,7 @@ type DatabaseNodeStore struct {
schema string // Name of the schema holding the servers table.
table string // Name of the servers table.
column string // Column name in the servers table holding the server address.
where string // Optional WHERE filter
}
// DefaultNodeStore creates a new NodeStore using the given filename to
@ -59,13 +63,35 @@ func DefaultNodeStore(filename string) (*DatabaseNodeStore, error) {
return store, nil
}
// Option that can be used to tweak node store parameters.
type NodeStoreOption func(*nodeStoreOptions)
type nodeStoreOptions struct {
Where string
}
// WithNodeStoreWhereClause configures the node store to append the given
// hard-coded where clause to the SELECT query used to fetch nodes. Only the
// clause itself must be given, without the "WHERE" prefix.
func WithNodeStoreWhereClause(where string) NodeStoreOption {
return func(options *nodeStoreOptions) {
options.Where = where
}
}
// NewNodeStore creates a new NodeStore.
func NewNodeStore(db *sql.DB, schema, table, column string) *DatabaseNodeStore {
func NewNodeStore(db *sql.DB, schema, table, column string, options ...NodeStoreOption) *DatabaseNodeStore {
o := &nodeStoreOptions{}
for _, option := range options {
option(o)
}
return &DatabaseNodeStore{
db: db,
schema: schema,
table: table,
column: column,
where: o.Where,
}
}
@ -78,6 +104,9 @@ func (d *DatabaseNodeStore) Get(ctx context.Context) ([]NodeInfo, error) {
defer tx.Rollback()
query := fmt.Sprintf("SELECT %s FROM %s.%s", d.column, d.schema, d.table)
if d.where != "" {
query += " WHERE " + d.where
}
rows, err := tx.QueryContext(ctx, query)
if err != nil {
return nil, errors.Wrap(err, "failed to query servers table")

View File

@ -18,8 +18,10 @@ import (
"context"
"database/sql/driver"
"io"
"log"
"net"
"reflect"
"syscall"
"time"
"github.com/Rican7/retry/backoff"
@ -624,8 +626,14 @@ func (r *Rows) ColumnTypeDatabaseTypeName(i int) string {
if r.types == nil {
var err error
r.types, err = r.rows.ColumnTypes()
if err != nil {
panic(err)
// an error might not matter if we get our types
if err != nil && i >= len(r.types) {
// a panic here doesn't really help,
// as an empty column type is not the end of the world
// but we should still inform the user of the failure
const msg = "row (%p) error returning column #%d type: %v\n"
log.Printf(msg, r, i, err)
return ""
}
}
return r.types[i]
@ -645,6 +653,8 @@ func valuesToNamedValues(args []driver.Value) []driver.NamedValue {
func driverError(err error) error {
switch err := errors.Cause(err).(type) {
case syscall.Errno:
return driver.ErrBadConn
case *net.OpError:
return driver.ErrBadConn
case protocol.ErrRequest:

View File

@ -50,7 +50,7 @@ static dqlite_node_info *makeInfos(int n) {
return calloc(n, sizeof(dqlite_node_info));
}
static void setInfo(dqlite_node_info *infos, unsigned i, unsigned id, const char *address) {
static void setInfo(dqlite_node_info *infos, unsigned i, dqlite_node_id id, const char *address) {
dqlite_node_info *info = &infos[i];
info->id = id;
info->address = address;
@ -105,7 +105,7 @@ func ConfigMultiThread() error {
// NewNode creates a new Node instance.
func NewNode(id uint64, address string, dir string) (*Node, error) {
var server *C.dqlite_node
cid := C.unsigned(id)
cid := C.dqlite_node_id(id)
caddress := C.CString(address)
defer C.free(unsafe.Pointer(caddress))
@ -186,7 +186,7 @@ func (s *Node) Recover(cluster []protocol.NodeInfo) error {
infos := C.makeInfos(n)
defer C.free(unsafe.Pointer(infos))
for i, info := range cluster {
cid := C.unsigned(info.ID)
cid := C.dqlite_node_id(info.ID)
caddress := C.CString(info.Address)
defer C.free(unsafe.Pointer(caddress))
C.setInfo(infos, C.unsigned(i), cid, caddress)

View File

@ -6,6 +6,19 @@ const VersionOne = uint64(1)
// VersionLegacy is the pre 1.0 dqlite server protocol version.
const VersionLegacy = uint64(0x86104dd760433fe5)
// Cluster response formats
const (
ClusterFormatV0 = 0
ClusterFormatV1 = 1
)
// Node roles
const (
Voter = NodeRole(0)
StandBy = NodeRole(1)
Spare = NodeRole(2)
)
// SQLite datatype codes
const (
Integer = 1
@ -35,11 +48,12 @@ const (
RequestExecSQL = 8
RequestQuerySQL = 9
RequestInterrupt = 10
RequestJoin = 12
RequestPromote = 13
RequestAdd = 12
RequestAssign = 13
RequestRemove = 14
RequestDump = 15
RequestCluster = 16
RequestTransfer = 17
)
// Response types.

View File

@ -425,6 +425,7 @@ func (m *Message) getNodes() Nodes {
for i := 0; i < int(n); i++ {
servers[i].ID = m.getUint64()
servers[i].Address = m.getString()
servers[i].Role = NodeRole(m.getUint64())
}
return servers
@ -501,16 +502,25 @@ type Result struct {
type Rows struct {
Columns []string
message *Message
types []uint8
}
// columnTypes returns the row's column types
// if save is true, it will restore the buffer offset
func (r *Rows) columnTypes(save bool) ([]uint8, error) {
types := make([]uint8, len(r.Columns))
// use cached values if possible if not advancing the buffer offset
if save && r.types != nil {
return r.types, nil
}
// column types should never change between rows
// use cached copy to allow getting types when no more rows
if r.types == nil {
r.types = make([]uint8, len(r.Columns))
}
// Each column needs a 4 byte slot to store the column type. The row
// header must be padded to reach word boundary.
headerBits := len(types) * 4
headerBits := len(r.types) * 4
padBits := 0
if trailingBits := (headerBits % messageWordBits); trailingBits != 0 {
padBits = (messageWordBits - trailingBits)
@ -523,34 +533,40 @@ func (r *Rows) columnTypes(save bool) ([]uint8, error) {
if slot == 0xee {
// More rows are available.
return nil, ErrRowsPart
if save {
r.message.bufferForGet().Advance(-(i + 1))
}
return r.types, ErrRowsPart
}
if slot == 0xff {
// Rows EOF marker
return nil, io.EOF
if save {
r.message.bufferForGet().Advance(-(i + 1))
}
return r.types, io.EOF
}
index := i * 2
if index >= len(types) {
if index >= len(r.types) {
continue // This is padding.
}
types[index] = slot & 0x0f
r.types[index] = slot & 0x0f
index++
if index >= len(types) {
if index >= len(r.types) {
continue // This is padding byte.
}
types[index] = slot >> 4
r.types[index] = slot >> 4
}
if save {
r.message.bufferForGet().Advance(-headerSize)
r.message.bufferForGet().Advance(-headerSize)
}
return types, nil
return r.types, nil
}
// Next returns the next row in the result set.
@ -675,10 +691,7 @@ var iso8601Formats = []string{
// ColumnTypes returns the column types for the the result set.
func (r *Rows) ColumnTypes() ([]string, error) {
types, err := r.columnTypes(true)
if err != nil {
return nil, err
}
kinds := make([]string, len(r.Columns))
kinds := make([]string, len(types))
for i, t := range types {
switch t {
@ -703,5 +716,5 @@ func (r *Rows) ColumnTypes() ([]string, error) {
}
}
return kinds, nil
return kinds, err
}

View File

@ -94,19 +94,20 @@ func EncodeInterrupt(request *Message, db uint64) {
request.putHeader(RequestInterrupt)
}
// EncodeJoin encodes a Join request.
func EncodeJoin(request *Message, id uint64, address string) {
// EncodeAdd encodes a Add request.
func EncodeAdd(request *Message, id uint64, address string) {
request.putUint64(id)
request.putString(address)
request.putHeader(RequestJoin)
request.putHeader(RequestAdd)
}
// EncodePromote encodes a Promote request.
func EncodePromote(request *Message, id uint64) {
// EncodeAssign encodes a Assign request.
func EncodeAssign(request *Message, id uint64, role uint64) {
request.putUint64(id)
request.putUint64(role)
request.putHeader(RequestPromote)
request.putHeader(RequestAssign)
}
// EncodeRemove encodes a Remove request.
@ -124,8 +125,15 @@ func EncodeDump(request *Message, name string) {
}
// EncodeCluster encodes a Cluster request.
func EncodeCluster(request *Message) {
request.putUint64(0)
func EncodeCluster(request *Message, format uint64) {
request.putUint64(format)
request.putHeader(RequestCluster)
}
// EncodeTransfer encodes a Transfer request.
func EncodeTransfer(request *Message, id uint64) {
request.putUint64(id)
request.putHeader(RequestTransfer)
}

View File

@ -6,6 +6,7 @@ package protocol
import "fmt"
// DecodeFailure decodes a Failure response.
func DecodeFailure(response *Message) (code uint64, message string, err error) {
mtype, _ := response.getHeader()
@ -14,13 +15,13 @@ func DecodeFailure(response *Message) (code uint64, message string, err error) {
e := ErrRequest{}
e.Code = response.getUint64()
e.Description = response.getString()
err = e
return
err = e
return
}
if mtype != ResponseFailure {
err = fmt.Errorf("unexpected response type %d", mtype)
return
return
}
code = response.getUint64()
@ -37,13 +38,13 @@ func DecodeWelcome(response *Message) (heartbeatTimeout uint64, err error) {
e := ErrRequest{}
e.Code = response.getUint64()
e.Description = response.getString()
err = e
return
err = e
return
}
if mtype != ResponseWelcome {
err = fmt.Errorf("unexpected response type %d", mtype)
return
return
}
heartbeatTimeout = response.getUint64()
@ -59,13 +60,13 @@ func DecodeNodeLegacy(response *Message) (address string, err error) {
e := ErrRequest{}
e.Code = response.getUint64()
e.Description = response.getString()
err = e
return
err = e
return
}
if mtype != ResponseNodeLegacy {
err = fmt.Errorf("unexpected response type %d", mtype)
return
return
}
address = response.getString()
@ -81,13 +82,13 @@ func DecodeNode(response *Message) (id uint64, address string, err error) {
e := ErrRequest{}
e.Code = response.getUint64()
e.Description = response.getString()
err = e
return
err = e
return
}
if mtype != ResponseNode {
err = fmt.Errorf("unexpected response type %d", mtype)
return
return
}
id = response.getUint64()
@ -104,13 +105,13 @@ func DecodeNodes(response *Message) (servers Nodes, err error) {
e := ErrRequest{}
e.Code = response.getUint64()
e.Description = response.getString()
err = e
return
err = e
return
}
if mtype != ResponseNodes {
err = fmt.Errorf("unexpected response type %d", mtype)
return
return
}
servers = response.getNodes()
@ -126,13 +127,13 @@ func DecodeDb(response *Message) (id uint32, err error) {
e := ErrRequest{}
e.Code = response.getUint64()
e.Description = response.getString()
err = e
return
err = e
return
}
if mtype != ResponseDb {
err = fmt.Errorf("unexpected response type %d", mtype)
return
return
}
id = response.getUint32()
@ -149,13 +150,13 @@ func DecodeStmt(response *Message) (db uint32, id uint32, params uint64, err err
e := ErrRequest{}
e.Code = response.getUint64()
e.Description = response.getString()
err = e
return
err = e
return
}
if mtype != ResponseStmt {
err = fmt.Errorf("unexpected response type %d", mtype)
return
return
}
db = response.getUint32()
@ -173,13 +174,13 @@ func DecodeEmpty(response *Message) (err error) {
e := ErrRequest{}
e.Code = response.getUint64()
e.Description = response.getString()
err = e
return
err = e
return
}
if mtype != ResponseEmpty {
err = fmt.Errorf("unexpected response type %d", mtype)
return
return
}
response.getUint64()
@ -195,13 +196,13 @@ func DecodeResult(response *Message) (result Result, err error) {
e := ErrRequest{}
e.Code = response.getUint64()
e.Description = response.getString()
err = e
return
err = e
return
}
if mtype != ResponseResult {
err = fmt.Errorf("unexpected response type %d", mtype)
return
return
}
result = response.getResult()
@ -217,13 +218,13 @@ func DecodeRows(response *Message) (rows Rows, err error) {
e := ErrRequest{}
e.Code = response.getUint64()
e.Description = response.getString()
err = e
return
err = e
return
}
if mtype != ResponseRows {
err = fmt.Errorf("unexpected response type %d", mtype)
return
return
}
rows = response.getRows()
@ -239,13 +240,13 @@ func DecodeFiles(response *Message) (files Files, err error) {
e := ErrRequest{}
e.Code = response.getUint64()
e.Description = response.getString()
err = e
return
err = e
return
}
if mtype != ResponseFiles {
err = fmt.Errorf("unexpected response type %d", mtype)
return
return
}
files = response.getFiles()

View File

@ -12,12 +12,13 @@ package protocol
//go:generate ./schema.sh --request Finalize db:uint32 stmt:uint32
//go:generate ./schema.sh --request ExecSQL db:uint64 sql:string values:NamedValues
//go:generate ./schema.sh --request QuerySQL db:uint64 sql:string values:NamedValues
//go:generate ./schema.sh --request Interrupt db:uint64
//go:generate ./schema.sh --request Join id:uint64 address:string
//go:generate ./schema.sh --request Promote id:uint64
//go:generate ./schema.sh --request Interrupt db:uint64
//go:generate ./schema.sh --request Add id:uint64 address:string
//go:generate ./schema.sh --request Assign id:uint64 role:uint64
//go:generate ./schema.sh --request Remove id:uint64
//go:generate ./schema.sh --request Dump name:string
//go:generate ./schema.sh --request Cluster unused:uint64
//go:generate ./schema.sh --request Cluster format:uint64
//go:generate ./schema.sh --request Transfer id:uint64
//go:generate ./schema.sh --response init
//go:generate ./schema.sh --response Failure code:uint64 message:string

View File

@ -4,10 +4,28 @@ import (
"context"
)
// NodeRole identifies the role of a node.
type NodeRole int
// String implements the Stringer interface.
func (r NodeRole) String() string {
switch r {
case Voter:
return "voter"
case StandBy:
return "stand-by"
case Spare:
return "spare"
default:
return "unknown role"
}
}
// NodeInfo holds information about a single server.
type NodeInfo struct {
ID uint64
Address string
Role NodeRole
}
// NodeStore is used by a dqlite client to get an initial list of candidate

2
vendor/modules.txt vendored
View File

@ -131,7 +131,7 @@ github.com/blang/semver
github.com/bronze1man/goStrongswanVici
# github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23
github.com/buger/jsonparser
# github.com/canonical/go-dqlite v1.2.0
# github.com/canonical/go-dqlite v1.3.0
github.com/canonical/go-dqlite
github.com/canonical/go-dqlite/client
github.com/canonical/go-dqlite/driver