You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/internal/storage/raft/backend.go

351 lines
11 KiB

// Copyright (c) HashiCorp, Inc.
[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
1 year ago
// SPDX-License-Identifier: BUSL-1.1
package raft
import (
"context"
"fmt"
"net"
"strconv"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/internal/storage/inmem"
"github.com/hashicorp/consul/proto-public/pbresource"
pbstorage "github.com/hashicorp/consul/proto/private/pbstorage"
)
// NewBackend returns a storage backend that uses Raft for durable persistence
// and serves reads from an in-memory database. It's suitable for production use.
//
// It's not an entirely clean abstraction because rather than owning the Raft
// subsystem directly, it has to integrate with the existing FSM and related
// machinery from before generic resources.
//
// The given Handle will be used to apply logs and interrogate leadership state.
// In certain restricted circumstances, Handle may be nil, such as during tests
// that only exercise snapshot restoration, or when initializing a throwaway FSM
// during peers.json recovery - but calling any of the data access methods (read
// or write) will result in a panic.
//
// With Raft, writes and strongly consistent reads must be done on the leader.
// Backend implements a gRPC server, which followers will use to transparently
// forward operations to the leader. To do so, they will obtain a connection
// using Handle.DialLeader. Connections are cached for re-use, so when there's
// a new leader, you must call LeaderChanged to refresh the connection. Leaders
// must accept connections and hand them off by calling Backend.HandleConnection.
// Backend's gRPC client and server *DO NOT* handle TLS themselves, as they are
// intended to communicate over Consul's multiplexed server port (which handles
// TLS).
//
// For more information, see here:
// https://github.com/hashicorp/consul/tree/main/docs/resources#raft-storage-backend
//
// You must call Run before using the backend.
func NewBackend(h Handle, l hclog.Logger) (*Backend, error) {
s, err := inmem.NewStore()
if err != nil {
return nil, err
}
b := &Backend{handle: h, store: s}
b.forwardingServer = newForwardingServer(b)
b.forwardingClient = newForwardingClient(h, l)
return b, nil
}
// Handle provides glue for interacting with the Raft subsystem via existing
// machinery on consul.Server.
type Handle interface {
// Apply the given log message.
Apply(msg []byte) (any, error)
// IsLeader determines if this server is the Raft leader (so can handle writes).
IsLeader() bool
// EnsureStrongConsistency checks the server is able to handle consistent reads by
// verifying its leadership and checking the FSM has applied all queued writes.
EnsureStrongConsistency(ctx context.Context) error
// DialLeader dials a gRPC connection to the leader for forwarding.
DialLeader() (*grpc.ClientConn, error)
}
// Backend is a Raft-backed storage backend implementation.
type Backend struct {
handle Handle
store *inmem.Store
forwardingServer *forwardingServer
forwardingClient *forwardingClient
}
// Run until the given context is canceled. This method blocks, so should be
// called in a goroutine.
func (b *Backend) Run(ctx context.Context) {
group, groupCtx := errgroup.WithContext(ctx)
group.Go(func() error {
b.store.Run(groupCtx)
return nil
})
group.Go(func() error {
return b.forwardingServer.run(groupCtx)
})
group.Wait()
}
// Read implements the storage.Backend interface.
func (b *Backend) Read(ctx context.Context, consistency storage.ReadConsistency, id *pbresource.ID) (*pbresource.Resource, error) {
// Easy case. Both leaders and followers can read from the local store.
if consistency == storage.EventualConsistency {
return b.store.Read(id)
}
if consistency != storage.StrongConsistency {
return nil, fmt.Errorf("%w: unknown consistency: %s", storage.ErrInconsistent, consistency)
}
// We are the leader. Handle the request ourself.
if b.handle.IsLeader() {
return b.leaderRead(ctx, id)
}
// Forward the request to the leader.
rsp, err := b.forwardingClient.read(ctx, &pbstorage.ReadRequest{Id: id})
if err != nil {
return nil, err
}
return rsp.GetResource(), nil
}
func (b *Backend) leaderRead(ctx context.Context, id *pbresource.ID) (*pbresource.Resource, error) {
if err := b.ensureStrongConsistency(ctx); err != nil {
return nil, err
}
return b.store.Read(id)
}
// WriteCAS implements the storage.Backend interface.
func (b *Backend) WriteCAS(ctx context.Context, res *pbresource.Resource) (*pbresource.Resource, error) {
req := &pbstorage.WriteRequest{Resource: res}
if b.handle.IsLeader() {
rsp, err := b.raftApply(&pbstorage.Log{
Type: pbstorage.LogType_LOG_TYPE_WRITE,
Request: &pbstorage.Log_Write{
Write: req,
},
})
if err != nil {
return nil, err
}
return rsp.GetWrite().GetResource(), nil
}
rsp, err := b.forwardingClient.write(ctx, req)
if err != nil {
return nil, err
}
return rsp.GetResource(), nil
}
// DeleteCAS implements the storage.Backend interface.
func (b *Backend) DeleteCAS(ctx context.Context, id *pbresource.ID, version string) error {
req := &pbstorage.DeleteRequest{
Id: id,
Version: version,
}
if b.handle.IsLeader() {
_, err := b.raftApply(&pbstorage.Log{
Type: pbstorage.LogType_LOG_TYPE_DELETE,
Request: &pbstorage.Log_Delete{
Delete: req,
},
})
return err
}
return b.forwardingClient.delete(ctx, req)
}
// List implements the storage.Backend interface.
func (b *Backend) List(ctx context.Context, consistency storage.ReadConsistency, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) {
// Easy case. Both leaders and followers can read from the local store.
if consistency == storage.EventualConsistency {
return b.store.List(resType, tenancy, namePrefix)
}
if consistency != storage.StrongConsistency {
return nil, fmt.Errorf("%w: unknown consistency: %s", storage.ErrInconsistent, consistency)
}
// We are the leader. Handle the request ourself.
if b.handle.IsLeader() {
return b.leaderList(ctx, resType, tenancy, namePrefix)
}
// Forward the request to the leader.
rsp, err := b.forwardingClient.list(ctx, &pbstorage.ListRequest{
Type: &pbresource.Type{
Group: resType.Group,
Kind: resType.Kind,
},
Tenancy: tenancy,
NamePrefix: namePrefix,
})
if err != nil {
return nil, err
}
return rsp.GetResources(), nil
}
func (b *Backend) leaderList(ctx context.Context, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) {
if err := b.ensureStrongConsistency(ctx); err != nil {
return nil, err
}
return b.store.List(resType, tenancy, namePrefix)
}
// WatchList implements the storage.Backend interface.
func (b *Backend) WatchList(_ context.Context, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) (storage.Watch, error) {
return b.store.WatchList(resType, tenancy, namePrefix)
}
// ListByOwner implements the storage.Backend interface.
func (b *Backend) ListByOwner(_ context.Context, id *pbresource.ID) ([]*pbresource.Resource, error) {
return b.store.ListByOwner(id)
}
// Apply is called by the FSM with the bytes of a Raft log entry, with Consul's
// envelope (i.e. type prefix and msgpack wrapper) stripped off.
func (b *Backend) Apply(buf []byte, idx uint64) any {
var req pbstorage.Log
if err := req.UnmarshalBinary(buf); err != nil {
return fmt.Errorf("failed to decode request: %w", err)
}
switch req.Type {
case pbstorage.LogType_LOG_TYPE_WRITE:
res := req.GetWrite().GetResource()
oldVsn := res.Version
res.Version = strconv.Itoa(int(idx))
if err := b.store.WriteCAS(res, oldVsn); err != nil {
return err
}
return &pbstorage.LogResponse{
Response: &pbstorage.LogResponse_Write{
Write: &pbstorage.WriteResponse{Resource: res},
},
}
case pbstorage.LogType_LOG_TYPE_DELETE:
req := req.GetDelete()
if err := b.store.DeleteCAS(req.Id, req.Version); err != nil {
return err
}
return &pbstorage.LogResponse{
Response: &pbstorage.LogResponse_Delete{},
}
}
return fmt.Errorf("unexpected request type: %s", req.Type)
}
// LeaderChanged should be called whenever the current Raft leader changes, to
// drop and re-create the gRPC connection used for forwarding.
func (b *Backend) LeaderChanged() { b.forwardingClient.leaderChanged() }
// HandleConnection should be called whenever a forwarding connection is opened.
func (b *Backend) HandleConnection(conn net.Conn) { b.forwardingServer.listener.Handle(conn) }
// raftApply round trips the given request through the Raft log and FSM.
func (b *Backend) raftApply(req *pbstorage.Log) (*pbstorage.LogResponse, error) {
msg, err := req.MarshalBinary()
if err != nil {
return nil, err
}
rsp, err := b.handle.Apply(msg)
if err != nil {
return nil, err
}
switch t := rsp.(type) {
case *pbstorage.LogResponse:
return t, nil
default:
return nil, fmt.Errorf("unexpected response from Raft apply: %T", rsp)
}
}
func (b *Backend) ensureStrongConsistency(ctx context.Context) error {
if err := b.handle.EnsureStrongConsistency(ctx); err != nil {
return fmt.Errorf("%w: %v", storage.ErrInconsistent, err)
}
return nil
}
// Snapshot obtains a point-in-time snapshot of the backend's state, so that it
// can be written to disk as a backup or sent to bootstrap a follower.
func (b *Backend) Snapshot() (*Snapshot, error) {
s, err := b.store.Snapshot()
if err != nil {
return nil, err
}
return &Snapshot{s}, nil
}
// Snapshot is a point-in-time snapshot of a backend's state.
type Snapshot struct{ s *inmem.Snapshot }
// Next returns the next resource in the snapshot, protobuf encoded. nil bytes
// will be returned when the end of the snapshot has been reached.
func (s *Snapshot) Next() ([]byte, error) {
res := s.s.Next()
if res == nil {
return nil, nil
}
return res.MarshalBinary()
}
// Restore starts the process of restoring a snapshot (i.e. from an on-disk
// backup, or to bootstrap from a leader).
//
// Callers *must* call Abort or Commit when done, to free resources.
func (b *Backend) Restore() (*Restoration, error) {
r, err := b.store.Restore()
if err != nil {
return nil, err
}
return &Restoration{r}, nil
}
// Restoration is a handle that can be used to restore a snapshot.
type Restoration struct{ r *inmem.Restoration }
// Apply the given protobuf-encoded resource to the backend.
func (r *Restoration) Apply(msg []byte) error {
var res pbresource.Resource
if err := res.UnmarshalBinary(msg); err != nil {
return err
}
return r.r.Apply(&res)
}
// Commit the restoration.
func (r *Restoration) Commit() { r.r.Commit() }
// Abort the restoration. It's safe to always call this in a defer statement
// because aborting a committed restoration is a no-op.
func (r *Restoration) Abort() { r.r.Abort() }