mirror of https://github.com/hashicorp/consul
606 lines
17 KiB
Go
606 lines
17 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package sprawl
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/base64"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
retry "github.com/avast/retry-go"
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/go-multierror"
|
|
|
|
"github.com/hashicorp/consul/testing/deployer/sprawl/internal/build"
|
|
"github.com/hashicorp/consul/testing/deployer/sprawl/internal/secrets"
|
|
"github.com/hashicorp/consul/testing/deployer/sprawl/internal/tfgen"
|
|
"github.com/hashicorp/consul/testing/deployer/topology"
|
|
"github.com/hashicorp/consul/testing/deployer/util"
|
|
)
|
|
|
|
const (
|
|
sharedBootstrapToken = "root"
|
|
// sharedBootstrapToken = "ec59aa56-1996-4ff1-911a-f5d782552a13"
|
|
|
|
sharedAgentRecoveryToken = "22082b05-05c9-4a0a-b3da-b9685ac1d688"
|
|
)
|
|
|
|
type LaunchPhase int
|
|
|
|
const (
|
|
LaunchPhaseRegular LaunchPhase = iota
|
|
LaunchPhaseUpgrade
|
|
)
|
|
|
|
func (lp LaunchPhase) String() string {
|
|
phaseStr := ""
|
|
switch lp {
|
|
case LaunchPhaseRegular:
|
|
phaseStr = "regular"
|
|
case LaunchPhaseUpgrade:
|
|
phaseStr = "upgrade"
|
|
}
|
|
return phaseStr
|
|
}
|
|
|
|
func (s *Sprawl) launch() error {
|
|
return s.launchType(true, LaunchPhaseRegular)
|
|
}
|
|
func (s *Sprawl) relaunch(launchPhase LaunchPhase) error {
|
|
return s.launchType(false, launchPhase)
|
|
}
|
|
func (s *Sprawl) launchType(firstTime bool, launchPhase LaunchPhase) (launchErr error) {
|
|
if err := build.DockerImages(s.logger, s.runner, s.topology); err != nil {
|
|
return fmt.Errorf("build.DockerImages: %w", err)
|
|
}
|
|
|
|
if firstTime {
|
|
// Initialize secrets the easy way for now (same in all clusters).
|
|
gossipKey, err := newGossipKey()
|
|
if err != nil {
|
|
return fmt.Errorf("newGossipKey: %w", err)
|
|
}
|
|
for _, cluster := range s.topology.Clusters {
|
|
s.secrets.SaveGeneric(cluster.Name, secrets.BootstrapToken, sharedBootstrapToken)
|
|
s.secrets.SaveGeneric(cluster.Name, secrets.AgentRecovery, sharedAgentRecoveryToken)
|
|
s.secrets.SaveGeneric(cluster.Name, secrets.GossipKey, gossipKey)
|
|
|
|
// Give servers a copy of the bootstrap token for use as their agent tokens
|
|
// to avoid complicating the chicken/egg situation for startup.
|
|
for _, node := range cluster.Nodes {
|
|
if node.IsServer() { // include disabled
|
|
s.secrets.SaveAgentToken(cluster.Name, node.ID(), sharedBootstrapToken)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
var cleanupFuncs []func()
|
|
defer func() {
|
|
for i := len(cleanupFuncs) - 1; i >= 0; i-- {
|
|
cleanupFuncs[i]()
|
|
}
|
|
}()
|
|
|
|
if firstTime {
|
|
var err error
|
|
s.generator, err = tfgen.NewGenerator(
|
|
s.logger.Named("tfgen"),
|
|
s.runner,
|
|
s.topology,
|
|
&s.secrets,
|
|
s.workdir,
|
|
s.license,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
s.generator.SetTopology(s.topology)
|
|
}
|
|
cleanupFuncs = append(cleanupFuncs, func() {
|
|
// Log the error before the cleanup so you don't have to wait to see
|
|
// the cause.
|
|
if launchErr != nil {
|
|
s.logger.Error("fatal error during launch", "error", launchErr)
|
|
}
|
|
|
|
_ = s.generator.DestroyAllQuietly()
|
|
})
|
|
|
|
if firstTime {
|
|
// The networking phase is special. We have to pick a random subnet and
|
|
// hope. Once we have this established once it is immutable for future
|
|
// runs.
|
|
if err := s.initNetworkingAndVolumes(); err != nil {
|
|
return fmt.Errorf("initNetworkingAndVolumes: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := s.assignIPAddresses(); err != nil {
|
|
return fmt.Errorf("assignIPAddresses: %w", err)
|
|
}
|
|
|
|
// The previous terraform run should have made the special volume for us.
|
|
if err := s.initTLS(context.TODO()); err != nil {
|
|
return fmt.Errorf("initTLS: %w", err)
|
|
}
|
|
|
|
if firstTime {
|
|
if err := s.createFirstTime(); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.generator.MarkLaunched()
|
|
} else {
|
|
if err := s.updateExisting(firstTime, launchPhase); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := s.waitForPeeringEstablishment(); err != nil {
|
|
return fmt.Errorf("waitForPeeringEstablishment: %w", err)
|
|
}
|
|
|
|
if err := s.waitForNetworkAreaEstablishment(); err != nil {
|
|
return fmt.Errorf("waitForNetworkAreaEstablishment: %w", err)
|
|
}
|
|
|
|
cleanupFuncs = nil // reset
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) Stop() error {
|
|
var merr error
|
|
if s.generator != nil {
|
|
if err := s.generator.DestroyAllQuietly(); err != nil {
|
|
merr = multierror.Append(merr, err)
|
|
}
|
|
}
|
|
return merr
|
|
}
|
|
|
|
const dockerOutOfNetworksErrorMessage = `Unable to create network: Error response from daemon: Pool overlaps with other one on this address space`
|
|
|
|
var ErrDockerNetworkCollision = errors.New("could not create one or more docker networks for use due to subnet collision")
|
|
|
|
func (s *Sprawl) initNetworkingAndVolumes() error {
|
|
var lastErr error
|
|
for attempts := 0; attempts < 5; attempts++ {
|
|
err := s.generator.Generate(tfgen.StepNetworks)
|
|
if err != nil && strings.Contains(err.Error(), dockerOutOfNetworksErrorMessage) {
|
|
lastErr = ErrDockerNetworkCollision
|
|
s.logger.Warn(ErrDockerNetworkCollision.Error()+"; retrying", "attempt", attempts+1)
|
|
time.Sleep(1 * time.Second)
|
|
continue
|
|
} else if err != nil {
|
|
return fmt.Errorf("generator[networks]: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
return lastErr
|
|
}
|
|
|
|
func (s *Sprawl) assignIPAddresses() error {
|
|
// assign ips now that we have network ips known to us
|
|
for _, net := range s.topology.Networks {
|
|
if len(net.IPPool) == 0 {
|
|
return fmt.Errorf("network %q does not have any ip assignments", net.Name)
|
|
}
|
|
}
|
|
for _, cluster := range s.topology.Clusters {
|
|
for _, node := range cluster.Nodes {
|
|
for _, addr := range node.Addresses {
|
|
net, ok := s.topology.Networks[addr.Network]
|
|
if !ok {
|
|
return fmt.Errorf("unknown network %q", addr.Network)
|
|
}
|
|
addr.IPAddress = net.IPByIndex(node.Index)
|
|
s.logger.Info("assign addr", "node", node.Name, "addr", addr.IPAddress, "type", addr.Type, "enabled", !node.Disabled)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) initConsulServers() error {
|
|
if err := s.generator.Generate(tfgen.StepServers); err != nil {
|
|
return fmt.Errorf("generator[servers]: %w", err)
|
|
}
|
|
|
|
// s.logger.Info("ALL", "t", jd(s.topology)) // TODO
|
|
|
|
// Create token-less api clients first.
|
|
for _, cluster := range s.topology.Clusters {
|
|
node := cluster.FirstServer()
|
|
|
|
var err error
|
|
s.clients[cluster.Name], err = util.ProxyAPIClient(
|
|
node.LocalProxyPort(),
|
|
node.LocalAddress(),
|
|
8500,
|
|
"", /*no token yet*/
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("error creating initial bootstrap client for cluster=%s: %w", cluster.Name, err)
|
|
}
|
|
}
|
|
|
|
if err := s.rejoinAllConsulServers(); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, cluster := range s.topology.Clusters {
|
|
err := s.bootstrapACLs(cluster.Name)
|
|
if err != nil {
|
|
return fmt.Errorf("bootstrap[%s]: %w", cluster.Name, err)
|
|
}
|
|
|
|
mgmtToken := s.secrets.ReadGeneric(cluster.Name, secrets.BootstrapToken)
|
|
|
|
// Reconfigure the clients to use a management token.
|
|
node := cluster.FirstServer()
|
|
s.clients[cluster.Name], err = util.ProxyAPIClient(
|
|
node.LocalProxyPort(),
|
|
node.LocalAddress(),
|
|
8500,
|
|
mgmtToken,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("error creating final client for cluster=%s: %v", cluster.Name, err)
|
|
}
|
|
|
|
// Connect to gRPC as well.
|
|
if cluster.EnableV2 {
|
|
s.grpcConns[cluster.Name], s.grpcConnCancel[cluster.Name], err = s.dialServerGRPC(cluster, node, mgmtToken)
|
|
if err != nil {
|
|
return fmt.Errorf("error creating gRPC client conn for cluster=%s: %w", cluster.Name, err)
|
|
}
|
|
}
|
|
|
|
// For some reason the grpc resolver stuff for partitions takes some
|
|
// time to get ready.
|
|
s.waitForLocalWrites(cluster, mgmtToken)
|
|
|
|
// Create tenancies so that the ACL tokens and clients have somewhere to go.
|
|
if cluster.Enterprise && node.Images.GreaterThanVersion(topology.MinVersionAgentTokenPartition) {
|
|
if err := s.initTenancies(cluster); err != nil {
|
|
return fmt.Errorf("initTenancies[%s]: %w", cluster.Name, err)
|
|
}
|
|
}
|
|
|
|
if err := s.populateInitialConfigEntries(cluster); err != nil {
|
|
return fmt.Errorf("populateInitialConfigEntries[%s]: %w", cluster.Name, err)
|
|
}
|
|
|
|
if cluster.EnableV2 {
|
|
// Resources are available only in V2
|
|
if err := s.populateInitialResources(cluster); err != nil {
|
|
return fmt.Errorf("populateInitialResources[%s]: %w", cluster.Name, err)
|
|
}
|
|
}
|
|
|
|
if err := s.createAnonymousToken(cluster); err != nil {
|
|
return fmt.Errorf("createAnonymousToken[%s]: %w", cluster.Name, err)
|
|
}
|
|
|
|
if node.Images.GreaterThanVersion(topology.MinVersionAgentTokenPartition) {
|
|
// Create tokens for all of the agents to use for anti-entropy.
|
|
//
|
|
// NOTE: this will cause the servers to roll to pick up the change to
|
|
// the acl{tokens{agent=XXX}}} section.
|
|
if err := s.createAgentTokens(cluster); err != nil {
|
|
return fmt.Errorf("createAgentTokens[%s]: %w", cluster.Name, err)
|
|
}
|
|
} else {
|
|
// Assign agent join policy to the anonymous token
|
|
if err := s.assignAgentJoinPolicyToAnonymousToken(cluster); err != nil {
|
|
return fmt.Errorf("assignAgentJoinPolicyToAnonymousToken[%s]: %w", cluster.Name, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) createFirstTime() error {
|
|
if err := s.initConsulServers(); err != nil {
|
|
if err := s.CaptureLogs(context.Background()); err != nil {
|
|
s.logger.Warn("container logs capture encountered failures", "error", err)
|
|
}
|
|
return fmt.Errorf("initConsulServers: %w", err)
|
|
}
|
|
|
|
if err := s.generator.Generate(tfgen.StepAgents); err != nil {
|
|
return fmt.Errorf("generator[agents]: %w", err)
|
|
}
|
|
for _, cluster := range s.topology.Clusters {
|
|
err := retry.Do(
|
|
func() error {
|
|
if err := s.waitForClientAntiEntropyOnce(cluster); err != nil {
|
|
return fmt.Errorf("create first time - waitForClientAntiEntropyOnce[%s]: %w", cluster.Name, err)
|
|
}
|
|
return nil
|
|
},
|
|
retry.MaxDelay(5*time.Second),
|
|
retry.Attempts(15),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("create first time - waitForClientAntiEntropyOnce[%s]: %w", cluster.Name, err)
|
|
}
|
|
}
|
|
|
|
// Ideally we start services WITH a token initially, so we pre-create them
|
|
// before running terraform for them.
|
|
if err := s.createAllWorkloadTokens(); err != nil {
|
|
return fmt.Errorf("createAllWorkloadTokens: %w", err)
|
|
}
|
|
|
|
if err := s.syncAllServicesForDataplaneInstances(); err != nil {
|
|
return fmt.Errorf("syncAllServicesForDataplaneInstances: %w", err)
|
|
}
|
|
|
|
// We can do this ahead, because we've incrementally run terraform as
|
|
// we went.
|
|
if err := s.registerAllServicesToAgents(); err != nil {
|
|
return fmt.Errorf("registerAllServicesToAgents: %w", err)
|
|
}
|
|
|
|
// NOTE: start services WITH token initially
|
|
if err := s.generator.Generate(tfgen.StepServices); err != nil {
|
|
return fmt.Errorf("generator[services]: %w", err)
|
|
}
|
|
|
|
if err := s.initPeerings(); err != nil {
|
|
return fmt.Errorf("initPeerings: %w", err)
|
|
}
|
|
|
|
if err := s.initNetworkAreas(); err != nil {
|
|
return fmt.Errorf("initNetworkAreas: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) updateExisting(firstTime bool, launchPhase LaunchPhase) error {
|
|
if launchPhase != LaunchPhaseUpgrade {
|
|
if err := s.preRegenTasks(); err != nil {
|
|
return fmt.Errorf("preRegenTasks: %w", err)
|
|
}
|
|
} else {
|
|
s.logger.Info("Upgrade - skip preRegenTasks")
|
|
for _, cluster := range s.topology.Clusters {
|
|
if err := s.createAgentTokens(cluster); err != nil {
|
|
return fmt.Errorf("createAgentTokens[%s]: %w", cluster.Name, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// We save all of the terraform to the end. Some of the containers will
|
|
// be a little broken until we can do stuff like register services to
|
|
// new agents, which we cannot do until they come up.
|
|
if err := s.generator.Generate(tfgen.StepRelaunch); err != nil {
|
|
return fmt.Errorf("generator[relaunch]: %w", err)
|
|
}
|
|
|
|
if err := s.postRegenTasks(firstTime); err != nil {
|
|
return fmt.Errorf("postRegenTasks: %w", err)
|
|
}
|
|
|
|
// TODO: enforce that peering relationships cannot change
|
|
// TODO: include a fixup version of new peerings?
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) preRegenTasks() error {
|
|
for _, cluster := range s.topology.Clusters {
|
|
// Create tenancies so that the ACL tokens and clients have somewhere to go.
|
|
if cluster.Enterprise {
|
|
if err := s.initTenancies(cluster); err != nil {
|
|
return fmt.Errorf("initTenancies[%s]: %w", cluster.Name, err)
|
|
}
|
|
}
|
|
|
|
if err := s.populateInitialConfigEntries(cluster); err != nil {
|
|
return fmt.Errorf("populateInitialConfigEntries[%s]: %w", cluster.Name, err)
|
|
}
|
|
|
|
// Create tokens for all of the agents to use for anti-entropy.
|
|
if err := s.createAgentTokens(cluster); err != nil {
|
|
return fmt.Errorf("createAgentTokens[%s]: %w", cluster.Name, err)
|
|
}
|
|
}
|
|
|
|
// Ideally we start services WITH a token initially, so we pre-create them
|
|
// before running terraform for them.
|
|
if err := s.createAllWorkloadTokens(); err != nil {
|
|
return fmt.Errorf("createAllWorkloadTokens: %w", err)
|
|
}
|
|
|
|
if err := s.syncAllServicesForDataplaneInstances(); err != nil {
|
|
return fmt.Errorf("syncAllServicesForDataplaneInstances: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) postRegenTasks(firstTime bool) error {
|
|
// rejoinAllConsulServers only for firstTime; otherwise all server agents have retry_join
|
|
if firstTime {
|
|
if err := s.rejoinAllConsulServers(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for _, cluster := range s.topology.Clusters {
|
|
var err error
|
|
|
|
mgmtToken := s.secrets.ReadGeneric(cluster.Name, secrets.BootstrapToken)
|
|
|
|
// Reconfigure the clients to use a management token.
|
|
node := cluster.FirstServer()
|
|
if node.Disabled {
|
|
continue
|
|
}
|
|
s.clients[cluster.Name], err = util.ProxyAPIClient(
|
|
node.LocalProxyPort(),
|
|
node.LocalAddress(),
|
|
8500,
|
|
mgmtToken,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("error creating final client for cluster=%s: %v", cluster.Name, err)
|
|
}
|
|
|
|
s.waitForLeader(cluster)
|
|
|
|
// For some reason the grpc resolver stuff for partitions takes some
|
|
// time to get ready.
|
|
s.waitForLocalWrites(cluster, mgmtToken)
|
|
}
|
|
|
|
for _, cluster := range s.topology.Clusters {
|
|
if err := s.waitForClientAntiEntropyOnce(cluster); err != nil {
|
|
return fmt.Errorf("post regenerate waitForClientAntiEntropyOnce[%s]: %w", cluster.Name, err)
|
|
}
|
|
}
|
|
|
|
if err := s.registerAllServicesToAgents(); err != nil {
|
|
return fmt.Errorf("registerAllServicesToAgents: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Sprawl) waitForLocalWrites(cluster *topology.Cluster, token string) {
|
|
var (
|
|
client = s.clients[cluster.Name]
|
|
logger = s.logger.With("cluster", cluster.Name)
|
|
)
|
|
tryKV := func() error {
|
|
_, err := client.KV().Put(&api.KVPair{
|
|
Key: "local-test",
|
|
Value: []byte("payload-for-local-test-in-" + cluster.Name),
|
|
}, nil)
|
|
return err
|
|
}
|
|
tryAP := func() error {
|
|
if !cluster.Enterprise {
|
|
return nil
|
|
}
|
|
_, _, err := client.Partitions().Create(context.Background(), &api.Partition{
|
|
Name: "placeholder",
|
|
}, &api.WriteOptions{Token: token})
|
|
return err
|
|
}
|
|
|
|
start := time.Now()
|
|
for attempts := 0; ; attempts++ {
|
|
if err := tryKV(); err != nil {
|
|
logger.Debug("local kv write failed; something is not ready yet", "error", err)
|
|
time.Sleep(500 * time.Millisecond)
|
|
continue
|
|
} else {
|
|
dur := time.Since(start)
|
|
logger.Debug("local kv write success", "elapsed", dur, "retries", attempts)
|
|
}
|
|
|
|
break
|
|
}
|
|
|
|
serverNodes := cluster.ServerNodes()
|
|
if cluster.Enterprise && serverNodes[0].Images.GreaterThanVersion(topology.MinVersionAgentTokenPartition) {
|
|
start = time.Now()
|
|
for attempts := 0; ; attempts++ {
|
|
if err := tryAP(); err != nil {
|
|
logger.Debug("local partition write failed; something is not ready yet", "error", err)
|
|
time.Sleep(500 * time.Millisecond)
|
|
continue
|
|
} else {
|
|
dur := time.Since(start)
|
|
logger.Debug("local partition write success", "elapsed", dur, "retries", attempts)
|
|
}
|
|
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Sprawl) waitForClientAntiEntropyOnce(cluster *topology.Cluster) error {
|
|
if cluster.EnableV2 {
|
|
return nil // v1 catalog is disabled when v2 catalog is enabled
|
|
}
|
|
var (
|
|
client = s.clients[cluster.Name]
|
|
logger = s.logger.With("cluster", cluster.Name)
|
|
)
|
|
|
|
var (
|
|
queryOptionList = cluster.PartitionQueryOptionsList()
|
|
start = time.Now()
|
|
cc = client.Catalog()
|
|
)
|
|
for {
|
|
// Enumerate all of the nodes that are currently in the catalog. This
|
|
// will overmatch including things like fake nodes for agentless but
|
|
// that's ok.
|
|
current := make(map[topology.NodeID]*api.Node)
|
|
for _, queryOpts := range queryOptionList {
|
|
nodes, _, err := cc.Nodes(queryOpts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, node := range nodes {
|
|
nid := topology.NewNodeID(node.Node, node.Partition)
|
|
current[nid] = node
|
|
}
|
|
}
|
|
|
|
// See if we have them all.
|
|
var stragglers []topology.NodeID
|
|
for _, node := range cluster.Nodes {
|
|
if !node.IsAgent() || node.Disabled {
|
|
continue
|
|
}
|
|
nid := node.CatalogID()
|
|
|
|
got, ok := current[nid]
|
|
if ok && (len(got.TaggedAddresses) > 0 || got.Address != "") {
|
|
// this is a field that is not updated just due to serf reconcile
|
|
continue
|
|
}
|
|
|
|
stragglers = append(stragglers, nid)
|
|
}
|
|
|
|
if len(stragglers) == 0 {
|
|
dur := time.Since(start)
|
|
logger.Debug("all nodes have posted node updates, so first anti-entropy has happened", "elapsed", dur)
|
|
return nil
|
|
}
|
|
logger.Debug("not all nodes have posted node updates yet", "nodes", stragglers)
|
|
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
}
|
|
|
|
func newGossipKey() (string, error) {
|
|
key := make([]byte, 16)
|
|
n, err := rand.Reader.Read(key)
|
|
if err != nil {
|
|
return "", fmt.Errorf("error reading random data: %s", err)
|
|
}
|
|
if n != 16 {
|
|
return "", fmt.Errorf("couldn't read enough entropy. Generate more entropy")
|
|
}
|
|
return base64.StdEncoding.EncodeToString(key), nil
|
|
}
|