consul/testing/deployer/sprawl/peering.go

249 lines
7.0 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package sprawl
import (
"context"
"errors"
"fmt"
"log"
"net/http"
"strings"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/testing/deployer/topology"
)
// TODO: this is definitely a grpc resolver/balancer issue to look into
const grpcWeirdError = `transport: Error while dialing failed to find Consul server for global address`
func isWeirdGRPCError(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), grpcWeirdError)
}
func (s *Sprawl) initPeerings() error {
// TODO: wait until services are healthy? wait until mesh gateways work?
// if err := s.generator.Generate(tfgen.StepPeering); err != nil {
// return fmt.Errorf("generator[peering]: %w", err)
// }
var (
logger = s.logger.Named("peering")
_ = logger
)
for _, peering := range s.topology.Peerings {
dialingCluster, ok := s.topology.Clusters[peering.Dialing.Name]
if !ok {
return fmt.Errorf("peering references dialing cluster that does not exist: %s", peering.String())
}
acceptingCluster, ok := s.topology.Clusters[peering.Accepting.Name]
if !ok {
return fmt.Errorf("peering references accepting cluster that does not exist: %s", peering.String())
}
var (
dialingClient = s.clients[dialingCluster.Name]
acceptingClient = s.clients[acceptingCluster.Name]
)
// TODO: allow for use of ServerExternalAddresses
req1 := api.PeeringGenerateTokenRequest{
PeerName: peering.Accepting.PeerName,
}
if acceptingCluster.Enterprise {
req1.Partition = peering.Accepting.Partition
}
s.awaitMeshGateways()
GENTOKEN:
resp, _, err := acceptingClient.Peerings().GenerateToken(context.Background(), req1, nil)
if err != nil {
if isWeirdGRPCError(err) {
time.Sleep(50 * time.Millisecond)
goto GENTOKEN
}
return fmt.Errorf("error generating peering token for %q: %w", peering.String(), err)
}
peeringToken := resp.PeeringToken
logger.Debug("generated peering token", "peering", peering.String())
req2 := api.PeeringEstablishRequest{
PeerName: peering.Dialing.PeerName,
PeeringToken: peeringToken,
}
if dialingCluster.Enterprise {
req2.Partition = peering.Dialing.Partition
}
logger.Info("registering peering with token", "peering", peering.String())
ESTABLISH:
_, _, err = dialingClient.Peerings().Establish(context.Background(), req2, nil)
if err != nil {
if isWeirdGRPCError(err) {
time.Sleep(50 * time.Millisecond)
goto ESTABLISH
}
// Establish and friends return an api.StatusError value, not pointer
// not sure if this is weird
var asStatusError api.StatusError
if errors.As(err, &asStatusError) && asStatusError.Code == http.StatusGatewayTimeout {
time.Sleep(50 * time.Millisecond)
goto ESTABLISH
}
return fmt.Errorf("error establishing peering with token for %q: %#v", peering.String(), err)
}
logger.Info("peering registered", "peering", peering.String())
}
return nil
}
func (s *Sprawl) waitForPeeringEstablishment() error {
s.awaitMeshGateways()
var (
logger = s.logger.Named("peering")
)
logger.Info("awaiting peering establishment")
startTimeTotal := time.Now()
for _, peering := range s.topology.Peerings {
dialingCluster, ok := s.topology.Clusters[peering.Dialing.Name]
if !ok {
return fmt.Errorf("peering references dialing cluster that does not exist: %s", peering.String())
}
acceptingCluster, ok := s.topology.Clusters[peering.Accepting.Name]
if !ok {
return fmt.Errorf("peering references accepting cluster that does not exist: %s", peering.String())
}
var (
dialingClient = s.clients[dialingCluster.Name]
acceptingClient = s.clients[acceptingCluster.Name]
dialingLogger = logger.With(
"cluster", dialingCluster.Name,
"peering", peering.String(),
)
acceptingLogger = logger.With(
"cluster", acceptingCluster.Name,
"peering", peering.String(),
)
)
s.checkPeeringDirection(dialingLogger, dialingClient, peering.Dialing, dialingCluster.Enterprise)
s.checkPeeringDirection(acceptingLogger, acceptingClient, peering.Accepting, acceptingCluster.Enterprise)
}
logger.Info("peering established", "dur", time.Since(startTimeTotal).Round(time.Second))
return nil
}
func (s *Sprawl) checkPeeringDirection(logger hclog.Logger, client *api.Client, pc topology.PeerCluster, enterprise bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
startTime := time.Now()
for {
opts := &api.QueryOptions{}
logger2 := logger.With("dur", time.Since(startTime).Round(time.Second))
if enterprise {
opts.Partition = pc.Partition
}
res, _, err := client.Peerings().Read(ctx, pc.PeerName, opts)
if isWeirdGRPCError(err) {
time.Sleep(50 * time.Millisecond)
continue
}
if err != nil {
logger2.Debug("error looking up peering", "error", err)
time.Sleep(100 * time.Millisecond)
continue
}
if res == nil {
logger2.Debug("peering not found")
time.Sleep(100 * time.Millisecond)
continue
}
if res.State == api.PeeringStateActive {
break
}
logger2.Debug("peering not active yet", "state", res.State)
time.Sleep(500 * time.Millisecond)
}
logger.Debug("peering is active", "dur", time.Since(startTime).Round(time.Second))
}
func (s *Sprawl) awaitMeshGateways() {
startTime := time.Now()
s.logger.Info("awaiting mesh gateways")
// TODO: maybe a better way to do this
mgws := []*topology.Service{}
for _, clu := range s.topology.Clusters {
for _, node := range clu.Nodes {
for _, svc := range node.Services {
if svc.IsMeshGateway {
mgws = append(mgws, svc)
}
}
}
}
// TODO: parallel
for _, mgw := range mgws {
cl := s.clients[mgw.Node.Cluster]
logger := s.logger.With("cluster", mgw.Node.Cluster, "sid", mgw.ID, "nid", mgw.Node.ID())
logger.Info("awaiting MGW readiness")
RETRY:
// TODO: not sure if there's a better way to check if the MGW is ready
svcs, _, err := cl.Catalog().Service(mgw.ID.Name, "", &api.QueryOptions{
Namespace: mgw.ID.Namespace,
Partition: mgw.ID.Partition,
})
if err != nil {
logger.Debug("fetching MGW service", "err", err)
time.Sleep(time.Second)
goto RETRY
}
if len(svcs) < 1 {
logger.Debug("no MGW service in catalog yet")
time.Sleep(time.Second)
goto RETRY
}
if len(svcs) > 1 {
// not sure when this would happen
log.Fatalf("expected 1 MGW service, actually: %#v", svcs)
}
entries, _, err := cl.Health().Service(mgw.ID.Name, "", true, &api.QueryOptions{
Namespace: mgw.ID.Namespace,
Partition: mgw.ID.Partition,
})
if err != nil {
logger.Debug("fetching MGW checks", "err", err)
time.Sleep(time.Second)
goto RETRY
}
if len(entries) != 1 {
logger.Debug("expected 1 MGW entry", "entries", entries)
time.Sleep(time.Second)
goto RETRY
}
logger.Debug("MGW ready", "entry", *(entries[0]), "dur", time.Since(startTime).Round(time.Second))
}
s.logger.Info("mesh gateways ready", "dur", time.Since(startTime).Round(time.Second))
}