You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/snapshot/archive.go

237 lines
6.9 KiB

// Copyright (c) HashiCorp, Inc.
[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
1 year ago
// SPDX-License-Identifier: BUSL-1.1
Adds support for snapshots and restores. (#2396) * Updates Raft library to get new snapshot/restore API. * Basic backup and restore working, but need some cleanup. * Breaks out a snapshot module and adds a SHA256 integrity check. * Adds snapshot ACL and fills in some missing comments. * Require a consistent read for snapshots. * Make sure snapshot works if ACLs aren't enabled. * Adds a bit of package documentation. * Returns an empty response from restore to avoid EOF errors. * Adds API client support for snapshots. * Makes internal file names match on-disk file snapshots. * Adds DC and token coverage for snapshot API test. * Adds missing documentation. * Adds a unit test for the snapshot client endpoint. * Moves the connection pool out of the client for easier testing. * Fixes an incidental issue in the prepared query unit test. I realized I had two servers in bootstrap mode so this wasn't a good setup. * Adds a half close to the TCP stream and fixes panic on error. * Adds client and endpoint tests for snapshots. * Moves the pool back into the snapshot RPC client. * Adds a TLS test and fixes half-closes for TLS connections. * Tweaks some comments. * Adds a low-level snapshot test. This is independent of Consul so we can pull this out into a library later if we want to. * Cleans up snapshot and archive and completes archive tests. * Sends a clear error for snapshot operations in dev mode. Snapshots require the Raft snapshots to be readable, which isn't supported in dev mode. Send a clear error instead of a deep-down Raft one. * Adds docs for the snapshot endpoint. * Adds a stale mode and index feedback for snapshot saves. This gives folks a way to extract data even if the cluster has no leader. * Changes the internal format of a snapshot from zip to tgz. * Pulls in Raft fix to cancel inflight before a restore. * Pulls in new Raft restore interface. * Adds metadata to snapshot saves and a verify function. * Adds basic save and restore snapshot CLI commands. * Gets rid of tarball extensions and adds restore message. * Fixes an incidental bad link in the KV docs. * Adds documentation for the snapshot CLI commands. * Scuttle any request body when a snapshot is saved. * Fixes archive unit test error message check. * Allows for nil output writers in snapshot RPC handlers. * Renames hash list Decode to DecodeAndVerify. * Closes the client connection for snapshot ops. * Lowers timeout for restore ops. * Updates Raft vendor to get new Restore signature and integrates with Consul. * Bounces the leader's internal state when we do a restore.
8 years ago
// The archive utilities manage the internal format of a snapshot, which is a
// tar file with the following contents:
//
// meta.json - JSON-encoded snapshot metadata from Raft
// state.bin - Encoded snapshot data from Raft
// SHA256SUMS - SHA-256 sums of the above two files
//
// The integrity information is automatically created and checked, and a failure
// there just looks like an error to the caller.
package snapshot
import (
"archive/tar"
"bufio"
"bytes"
"crypto/sha256"
"encoding/json"
"fmt"
"hash"
"io"
"time"
"github.com/hashicorp/raft"
)
// hashList manages a list of filenames and their hashes.
type hashList struct {
hashes map[string]hash.Hash
}
// newHashList returns a new hashList.
func newHashList() *hashList {
return &hashList{
hashes: make(map[string]hash.Hash),
}
}
// Add creates a new hash for the given file.
func (hl *hashList) Add(file string) hash.Hash {
if existing, ok := hl.hashes[file]; ok {
return existing
}
h := sha256.New()
hl.hashes[file] = h
return h
}
// Encode takes the current sum of all the hashes and saves the hash list as a
// SHA256SUMS-style text file.
func (hl *hashList) Encode(w io.Writer) error {
for file, h := range hl.hashes {
if _, err := fmt.Fprintf(w, "%x %s\n", h.Sum([]byte{}), file); err != nil {
return err
}
}
return nil
}
// DecodeAndVerify reads a SHA256SUMS-style text file and checks the results
// against the current sums for all the hashes.
func (hl *hashList) DecodeAndVerify(r io.Reader) error {
// Read the file and make sure everything in there has a matching hash.
seen := make(map[string]struct{})
s := bufio.NewScanner(r)
for s.Scan() {
sha := make([]byte, sha256.Size)
var file string
if _, err := fmt.Sscanf(s.Text(), "%x %s", &sha, &file); err != nil {
return err
}
h, ok := hl.hashes[file]
if !ok {
return fmt.Errorf("list missing hash for %q", file)
}
if !bytes.Equal(sha, h.Sum([]byte{})) {
return fmt.Errorf("hash check failed for %q", file)
}
seen[file] = struct{}{}
}
if err := s.Err(); err != nil {
return err
}
// Make sure everything we had a hash for was seen.
for file := range hl.hashes {
Adds support for snapshots and restores. (#2396) * Updates Raft library to get new snapshot/restore API. * Basic backup and restore working, but need some cleanup. * Breaks out a snapshot module and adds a SHA256 integrity check. * Adds snapshot ACL and fills in some missing comments. * Require a consistent read for snapshots. * Make sure snapshot works if ACLs aren't enabled. * Adds a bit of package documentation. * Returns an empty response from restore to avoid EOF errors. * Adds API client support for snapshots. * Makes internal file names match on-disk file snapshots. * Adds DC and token coverage for snapshot API test. * Adds missing documentation. * Adds a unit test for the snapshot client endpoint. * Moves the connection pool out of the client for easier testing. * Fixes an incidental issue in the prepared query unit test. I realized I had two servers in bootstrap mode so this wasn't a good setup. * Adds a half close to the TCP stream and fixes panic on error. * Adds client and endpoint tests for snapshots. * Moves the pool back into the snapshot RPC client. * Adds a TLS test and fixes half-closes for TLS connections. * Tweaks some comments. * Adds a low-level snapshot test. This is independent of Consul so we can pull this out into a library later if we want to. * Cleans up snapshot and archive and completes archive tests. * Sends a clear error for snapshot operations in dev mode. Snapshots require the Raft snapshots to be readable, which isn't supported in dev mode. Send a clear error instead of a deep-down Raft one. * Adds docs for the snapshot endpoint. * Adds a stale mode and index feedback for snapshot saves. This gives folks a way to extract data even if the cluster has no leader. * Changes the internal format of a snapshot from zip to tgz. * Pulls in Raft fix to cancel inflight before a restore. * Pulls in new Raft restore interface. * Adds metadata to snapshot saves and a verify function. * Adds basic save and restore snapshot CLI commands. * Gets rid of tarball extensions and adds restore message. * Fixes an incidental bad link in the KV docs. * Adds documentation for the snapshot CLI commands. * Scuttle any request body when a snapshot is saved. * Fixes archive unit test error message check. * Allows for nil output writers in snapshot RPC handlers. * Renames hash list Decode to DecodeAndVerify. * Closes the client connection for snapshot ops. * Lowers timeout for restore ops. * Updates Raft vendor to get new Restore signature and integrates with Consul. * Bounces the leader's internal state when we do a restore.
8 years ago
if _, ok := seen[file]; !ok {
return fmt.Errorf("file missing for %q", file)
}
}
return nil
}
// write takes a writer and creates an archive with the snapshot metadata,
// the snapshot itself, and adds some integrity checking information.
func write(out io.Writer, metadata *raft.SnapshotMeta, snap io.Reader) error {
// Start a new tarball.
now := time.Now()
archive := tar.NewWriter(out)
// Create a hash list that we will use to write a SHA256SUMS file into
// the archive.
hl := newHashList()
// Encode the snapshot metadata, which we need to feed back during a
// restore.
metaHash := hl.Add("meta.json")
var metaBuffer bytes.Buffer
enc := json.NewEncoder(&metaBuffer)
if err := enc.Encode(metadata); err != nil {
return fmt.Errorf("failed to encode snapshot metadata: %v", err)
}
if err := archive.WriteHeader(&tar.Header{
Name: "meta.json",
Mode: 0600,
Size: int64(metaBuffer.Len()),
ModTime: now,
}); err != nil {
return fmt.Errorf("failed to write snapshot metadata header: %v", err)
}
if _, err := io.Copy(archive, io.TeeReader(&metaBuffer, metaHash)); err != nil {
return fmt.Errorf("failed to write snapshot metadata: %v", err)
}
// Copy the snapshot data given the size from the metadata.
snapHash := hl.Add("state.bin")
if err := archive.WriteHeader(&tar.Header{
Name: "state.bin",
Mode: 0600,
Size: metadata.Size,
ModTime: now,
}); err != nil {
return fmt.Errorf("failed to write snapshot data header: %v", err)
}
if _, err := io.CopyN(archive, io.TeeReader(snap, snapHash), metadata.Size); err != nil {
return fmt.Errorf("failed to write snapshot metadata: %v", err)
}
// Create a SHA256SUMS file that we can use to verify on restore.
var shaBuffer bytes.Buffer
if err := hl.Encode(&shaBuffer); err != nil {
return fmt.Errorf("failed to encode snapshot hashes: %v", err)
}
if err := archive.WriteHeader(&tar.Header{
Name: "SHA256SUMS",
Mode: 0600,
Size: int64(shaBuffer.Len()),
ModTime: now,
}); err != nil {
return fmt.Errorf("failed to write snapshot hashes header: %v", err)
}
if _, err := io.Copy(archive, &shaBuffer); err != nil {
return fmt.Errorf("failed to write snapshot metadata: %v", err)
}
// Finalize the archive.
if err := archive.Close(); err != nil {
return fmt.Errorf("failed to finalize snapshot: %v", err)
}
return nil
}
// read takes a reader and extracts the snapshot metadata and the snapshot
// itself, and also checks the integrity of the data. You must arrange to call
// Close() on the returned object or else you will leak a temporary file.
func read(in io.Reader, metadata *raft.SnapshotMeta, snap io.Writer) error {
// Start a new tar reader.
archive := tar.NewReader(in)
// Create a hash list that we will use to compare with the SHA256SUMS
// file in the archive.
hl := newHashList()
// Populate the hashes for all the files we expect to see. The check at
// the end will make sure these are all present in the SHA256SUMS file
// and that the hashes match.
metaHash := hl.Add("meta.json")
snapHash := hl.Add("state.bin")
// Look through the archive for the pieces we care about.
var shaBuffer bytes.Buffer
for {
hdr, err := archive.Next()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed reading snapshot: %v", err)
}
switch hdr.Name {
case "meta.json":
// Previously we used json.Decode to decode the archive stream. There are
// edgecases in which it doesn't read all the bytes from the stream, even
// though the json object is still being parsed properly. Since we
// simultaneously feeded everything to metaHash, our hash ended up being
// different than what we calculated when creating the snapshot. Which in
// turn made the snapshot verification fail. By explicitly reading the
// whole thing first we ensure that we calculate the correct hash
// independent of how json.Decode works internally.
buf, err := io.ReadAll(io.TeeReader(archive, metaHash))
if err != nil {
return fmt.Errorf("failed to read snapshot metadata: %v", err)
}
if err := json.Unmarshal(buf, &metadata); err != nil {
Adds support for snapshots and restores. (#2396) * Updates Raft library to get new snapshot/restore API. * Basic backup and restore working, but need some cleanup. * Breaks out a snapshot module and adds a SHA256 integrity check. * Adds snapshot ACL and fills in some missing comments. * Require a consistent read for snapshots. * Make sure snapshot works if ACLs aren't enabled. * Adds a bit of package documentation. * Returns an empty response from restore to avoid EOF errors. * Adds API client support for snapshots. * Makes internal file names match on-disk file snapshots. * Adds DC and token coverage for snapshot API test. * Adds missing documentation. * Adds a unit test for the snapshot client endpoint. * Moves the connection pool out of the client for easier testing. * Fixes an incidental issue in the prepared query unit test. I realized I had two servers in bootstrap mode so this wasn't a good setup. * Adds a half close to the TCP stream and fixes panic on error. * Adds client and endpoint tests for snapshots. * Moves the pool back into the snapshot RPC client. * Adds a TLS test and fixes half-closes for TLS connections. * Tweaks some comments. * Adds a low-level snapshot test. This is independent of Consul so we can pull this out into a library later if we want to. * Cleans up snapshot and archive and completes archive tests. * Sends a clear error for snapshot operations in dev mode. Snapshots require the Raft snapshots to be readable, which isn't supported in dev mode. Send a clear error instead of a deep-down Raft one. * Adds docs for the snapshot endpoint. * Adds a stale mode and index feedback for snapshot saves. This gives folks a way to extract data even if the cluster has no leader. * Changes the internal format of a snapshot from zip to tgz. * Pulls in Raft fix to cancel inflight before a restore. * Pulls in new Raft restore interface. * Adds metadata to snapshot saves and a verify function. * Adds basic save and restore snapshot CLI commands. * Gets rid of tarball extensions and adds restore message. * Fixes an incidental bad link in the KV docs. * Adds documentation for the snapshot CLI commands. * Scuttle any request body when a snapshot is saved. * Fixes archive unit test error message check. * Allows for nil output writers in snapshot RPC handlers. * Renames hash list Decode to DecodeAndVerify. * Closes the client connection for snapshot ops. * Lowers timeout for restore ops. * Updates Raft vendor to get new Restore signature and integrates with Consul. * Bounces the leader's internal state when we do a restore.
8 years ago
return fmt.Errorf("failed to decode snapshot metadata: %v", err)
}
case "state.bin":
if _, err := io.Copy(io.MultiWriter(snap, snapHash), archive); err != nil {
return fmt.Errorf("failed to read or write snapshot data: %v", err)
}
case "SHA256SUMS":
if _, err := io.Copy(&shaBuffer, archive); err != nil {
return fmt.Errorf("failed to read snapshot hashes: %v", err)
}
default:
return fmt.Errorf("unexpected file %q in snapshot", hdr.Name)
}
}
// Verify all the hashes.
if err := hl.DecodeAndVerify(&shaBuffer); err != nil {
return fmt.Errorf("failed checking integrity of snapshot: %v", err)
}
return nil
}