You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/internal/storage/raft/backend.go

345 lines
10 KiB

package raft
import (
"context"
"fmt"
"net"
"strconv"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/internal/storage/inmem"
"github.com/hashicorp/consul/proto-public/pbresource"
pbstorage "github.com/hashicorp/consul/proto/private/pbstorage"
)
// NewBackend returns a storage backend that uses Raft for durable persistence
// and serves reads from an in-memory database. It's suitable for production use.
//
// It's not an entirely clean abstraction because rather than owning the Raft
// subsystem directly, it has to integrate with the existing FSM and related
// machinery from before generic resources.
//
// The given Handle will be used to apply logs and interrogate leadership state.
// In certain restricted circumstances, Handle may be nil, such as during tests
// that only exercise snapshot restoration, or when initializing a throwaway FSM
// during peers.json recovery - but calling any of the data access methods (read
// or write) will result in a panic.
//
// With Raft, writes and strongly consistent reads must be done on the leader.
// Backend implements a gRPC server, which followers will use to transparently
// forward operations to the leader. To do so, they will obtain a connection
// using Handle.DialLeader. Connections are cached for re-use, so when there's
// a new leader, you must call LeaderChanged to refresh the connection. Leaders
// must accept connections and hand them off by calling Backend.HandleConnection.
// Backend's gRPC client and server *DO NOT* handle TLS themselves, as they are
// intended to communicate over Consul's multiplexed server port (which handles
// TLS).
//
// You must call Run before using the backend.
func NewBackend(h Handle, l hclog.Logger) (*Backend, error) {
s, err := inmem.NewStore()
if err != nil {
return nil, err
}
b := &Backend{handle: h, store: s}
b.forwardingServer = newForwardingServer(b)
b.forwardingClient = newForwardingClient(h, l)
return b, nil
}
// Handle provides glue for interacting with the Raft subsystem via existing
// machinery on consul.Server.
type Handle interface {
// Apply the given log message.
Apply(msg []byte) (any, error)
// IsLeader determines if this server is the Raft leader (so can handle writes).
IsLeader() bool
// EnsureStrongConsistency checks the server is able to handle consistent reads by
// verifying its leadership and checking the FSM has applied all queued writes.
EnsureStrongConsistency(ctx context.Context) error
// DialLeader dials a gRPC connection to the leader for forwarding.
DialLeader() (*grpc.ClientConn, error)
}
// Backend is a Raft-backed storage backend implementation.
type Backend struct {
handle Handle
store *inmem.Store
forwardingServer *forwardingServer
forwardingClient *forwardingClient
}
// Run until the given context is canceled. This method blocks, so should be
// called in a goroutine.
func (b *Backend) Run(ctx context.Context) {
group, groupCtx := errgroup.WithContext(ctx)
group.Go(func() error {
b.store.Run(groupCtx)
return nil
})
group.Go(func() error {
return b.forwardingServer.run(groupCtx)
})
group.Wait()
}
// Read implements the storage.Backend interface.
func (b *Backend) Read(ctx context.Context, consistency storage.ReadConsistency, id *pbresource.ID) (*pbresource.Resource, error) {
// Easy case. Both leaders and followers can read from the local store.
if consistency == storage.EventualConsistency {
return b.store.Read(id)
}
if consistency != storage.StrongConsistency {
return nil, fmt.Errorf("%w: unknown consistency: %s", storage.ErrInconsistent, consistency)
}
// We are the leader. Handle the request ourself.
if b.handle.IsLeader() {
return b.leaderRead(ctx, id)
}
// Forward the request to the leader.
rsp, err := b.forwardingClient.read(ctx, &pbstorage.ReadRequest{Id: id})
if err != nil {
return nil, err
}
return rsp.GetResource(), nil
}
func (b *Backend) leaderRead(ctx context.Context, id *pbresource.ID) (*pbresource.Resource, error) {
if err := b.ensureStrongConsistency(ctx); err != nil {
return nil, err
}
return b.store.Read(id)
}
// WriteCAS implements the storage.Backend interface.
func (b *Backend) WriteCAS(ctx context.Context, res *pbresource.Resource) (*pbresource.Resource, error) {
req := &pbstorage.WriteRequest{Resource: res}
if b.handle.IsLeader() {
rsp, err := b.raftApply(&pbstorage.Log{
Type: pbstorage.LogType_LOG_TYPE_WRITE,
Request: &pbstorage.Log_Write{
Write: req,
},
})
if err != nil {
return nil, err
}
return rsp.GetWrite().GetResource(), nil
}
rsp, err := b.forwardingClient.write(ctx, req)
if err != nil {
return nil, err
}
return rsp.GetResource(), nil
}
// DeleteCAS implements the storage.Backend interface.
func (b *Backend) DeleteCAS(ctx context.Context, id *pbresource.ID, version string) error {
req := &pbstorage.DeleteRequest{
Id: id,
Version: version,
}
if b.handle.IsLeader() {
_, err := b.raftApply(&pbstorage.Log{
Type: pbstorage.LogType_LOG_TYPE_DELETE,
Request: &pbstorage.Log_Delete{
Delete: req,
},
})
return err
}
return b.forwardingClient.delete(ctx, req)
}
// List implements the storage.Backend interface.
func (b *Backend) List(ctx context.Context, consistency storage.ReadConsistency, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) {
// Easy case. Both leaders and followers can read from the local store.
if consistency == storage.EventualConsistency {
return b.store.List(resType, tenancy, namePrefix)
}
if consistency != storage.StrongConsistency {
return nil, fmt.Errorf("%w: unknown consistency: %s", storage.ErrInconsistent, consistency)
}
// We are the leader. Handle the request ourself.
if b.handle.IsLeader() {
return b.leaderList(ctx, resType, tenancy, namePrefix)
}
// Forward the request to the leader.
rsp, err := b.forwardingClient.list(ctx, &pbstorage.ListRequest{
Type: &pbresource.Type{
Group: resType.Group,
Kind: resType.Kind,
},
Tenancy: tenancy,
NamePrefix: namePrefix,
})
if err != nil {
return nil, err
}
return rsp.GetResources(), nil
}
func (b *Backend) leaderList(ctx context.Context, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) {
if err := b.ensureStrongConsistency(ctx); err != nil {
return nil, err
}
return b.store.List(resType, tenancy, namePrefix)
}
// WatchList implements the storage.Backend interface.
func (b *Backend) WatchList(_ context.Context, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) (storage.Watch, error) {
return b.store.WatchList(resType, tenancy, namePrefix)
}
// OwnerReferences implements the storage.Backend interface.
func (b *Backend) OwnerReferences(_ context.Context, id *pbresource.ID) ([]*pbresource.ID, error) {
return b.store.OwnerReferences(id)
}
// Apply is called by the FSM with the bytes of a Raft log entry, with Consul's
// envelope (i.e. type prefix and msgpack wrapper) stripped off.
func (b *Backend) Apply(buf []byte, idx uint64) any {
var req pbstorage.Log
if err := req.UnmarshalBinary(buf); err != nil {
return fmt.Errorf("failed to decode request: %w", err)
}
switch req.Type {
case pbstorage.LogType_LOG_TYPE_WRITE:
res := req.GetWrite().GetResource()
oldVsn := res.Version
res.Version = strconv.Itoa(int(idx))
if err := b.store.WriteCAS(res, oldVsn); err != nil {
return err
}
return &pbstorage.LogResponse{
Response: &pbstorage.LogResponse_Write{
Write: &pbstorage.WriteResponse{Resource: res},
},
}
case pbstorage.LogType_LOG_TYPE_DELETE:
req := req.GetDelete()
if err := b.store.DeleteCAS(req.Id, req.Version); err != nil {
return err
}
return &pbstorage.LogResponse{
Response: &pbstorage.LogResponse_Delete{},
}
}
return fmt.Errorf("unexpected request type: %s", req.Type)
}
// LeaderChanged should be called whenever the current Raft leader changes, to
// drop and re-create the gRPC connection used for forwarding.
func (b *Backend) LeaderChanged() { b.forwardingClient.leaderChanged() }
// HandleConnection should be called whenever a forwarding connection is opened.
func (b *Backend) HandleConnection(conn net.Conn) { b.forwardingServer.listener.Handle(conn) }
// raftApply round trips the given request through the Raft log and FSM.
func (b *Backend) raftApply(req *pbstorage.Log) (*pbstorage.LogResponse, error) {
msg, err := req.MarshalBinary()
if err != nil {
return nil, err
}
rsp, err := b.handle.Apply(msg)
if err != nil {
return nil, err
}
switch t := rsp.(type) {
case *pbstorage.LogResponse:
return t, nil
default:
return nil, fmt.Errorf("unexpected response from Raft apply: %T", rsp)
}
}
func (b *Backend) ensureStrongConsistency(ctx context.Context) error {
if err := b.handle.EnsureStrongConsistency(ctx); err != nil {
return fmt.Errorf("%w: %v", storage.ErrInconsistent, err)
}
return nil
}
// Snapshot obtains a point-in-time snapshot of the backend's state, so that it
// can be written to disk as a backup or sent to bootstrap a follower.
func (b *Backend) Snapshot() (*Snapshot, error) {
s, err := b.store.Snapshot()
if err != nil {
return nil, err
}
return &Snapshot{s}, nil
}
// Snapshot is a point-in-time snapshot of a backend's state.
type Snapshot struct{ s *inmem.Snapshot }
// Next returns the next resource in the snapshot, protobuf encoded. nil bytes
// will be returned when the end of the snapshot has been reached.
func (s *Snapshot) Next() ([]byte, error) {
res := s.s.Next()
if res == nil {
return nil, nil
}
return res.MarshalBinary()
}
// Restore starts the process of restoring a snapshot (i.e. from an on-disk
// backup, or to bootstrap from a leader).
//
// Callers *must* call Abort or Commit when done, to free resources.
func (b *Backend) Restore() (*Restoration, error) {
r, err := b.store.Restore()
if err != nil {
return nil, err
}
return &Restoration{r}, nil
}
// Restoration is a handle that can be used to restore a snapshot.
type Restoration struct{ r *inmem.Restoration }
// Apply the given protobuf-encoded resource to the backend.
func (r *Restoration) Apply(msg []byte) error {
var res pbresource.Resource
if err := res.UnmarshalBinary(msg); err != nil {
return err
}
return r.r.Apply(&res)
}
// Commit the restoration.
func (r *Restoration) Commit() { r.r.Commit() }
// Abort the restoration. It's safe to always call this in a defer statement
// because aborting a committed restoration is a no-op.
func (r *Restoration) Abort() { r.r.Abort() }