mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
670 lines
16 KiB
670 lines
16 KiB
// Copyright (c) HashiCorp, Inc. |
|
// SPDX-License-Identifier: BUSL-1.1 |
|
|
|
package sprawl |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"net/http" |
|
"time" |
|
|
|
"google.golang.org/protobuf/proto" |
|
"google.golang.org/protobuf/types/known/anypb" |
|
|
|
"github.com/hashicorp/consul/api" |
|
"github.com/hashicorp/consul/proto-public/pbresource" |
|
"github.com/hashicorp/consul/testing/deployer/topology" |
|
"github.com/hashicorp/consul/testing/deployer/util" |
|
) |
|
|
|
// registerAllServicesToAgents registers services in agent-ful mode |
|
func (s *Sprawl) registerAllServicesToAgents() error { |
|
for _, cluster := range s.topology.Clusters { |
|
if err := s.registerServicesToAgents(cluster); err != nil { |
|
return fmt.Errorf("registerServicesToAgents[%s]: %w", cluster.Name, err) |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (s *Sprawl) syncAllServicesForDataplaneInstances() error { |
|
for _, cluster := range s.topology.Clusters { |
|
if err := s.syncWorkloadsForDataplaneInstances(cluster); err != nil { |
|
return fmt.Errorf("syncWorkloadsForDataplaneInstances[%s]: %w", cluster.Name, err) |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (s *Sprawl) registerServicesToAgents(cluster *topology.Cluster) error { |
|
for _, node := range cluster.Nodes { |
|
if !node.RunsWorkloads() || len(node.Workloads) == 0 || node.Disabled { |
|
continue |
|
} |
|
|
|
if !node.IsAgent() { |
|
continue |
|
} |
|
|
|
agentClient, err := util.ProxyAPIClient( |
|
node.LocalProxyPort(), |
|
node.LocalAddress(), |
|
8500, |
|
"", /*token will be in request*/ |
|
) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
for _, wrk := range node.Workloads { |
|
if err := s.registerAgentService(agentClient, cluster, node, wrk); err != nil { |
|
return err |
|
} |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (s *Sprawl) registerAgentService( |
|
agentClient *api.Client, |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
wrk *topology.Workload, |
|
) error { |
|
if !node.IsAgent() { |
|
panic("called wrong method type") |
|
} |
|
|
|
if wrk.IsMeshGateway { |
|
return nil // handled at startup time for agent-ful, but won't be for agent-less |
|
} |
|
|
|
var ( |
|
logger = s.logger.With("cluster", cluster.Name) |
|
) |
|
|
|
reg := &api.AgentServiceRegistration{ |
|
ID: wrk.ID.Name, |
|
Name: wrk.ID.Name, |
|
Port: wrk.Port, |
|
Meta: wrk.Meta, |
|
} |
|
if cluster.Enterprise { |
|
reg.Namespace = wrk.ID.Namespace |
|
reg.Partition = wrk.ID.Partition |
|
} |
|
|
|
if !wrk.DisableServiceMesh { |
|
var upstreams []api.Upstream |
|
for _, us := range wrk.Upstreams { |
|
uAPI := api.Upstream{ |
|
DestinationPeer: us.Peer, |
|
DestinationName: us.ID.Name, |
|
LocalBindAddress: us.LocalAddress, |
|
LocalBindPort: us.LocalPort, |
|
// Config map[string]interface{} `json:",omitempty" bexpr:"-"` |
|
// MeshGateway MeshGatewayConfig `json:",omitempty"` |
|
} |
|
if cluster.Enterprise { |
|
uAPI.DestinationNamespace = us.ID.Namespace |
|
if us.Peer == "" { |
|
uAPI.DestinationPartition = us.ID.Partition |
|
} |
|
} |
|
upstreams = append(upstreams, uAPI) |
|
} |
|
reg.Connect = &api.AgentServiceConnect{ |
|
SidecarService: &api.AgentServiceRegistration{ |
|
Proxy: &api.AgentServiceConnectProxyConfig{ |
|
Upstreams: upstreams, |
|
}, |
|
}, |
|
} |
|
} |
|
|
|
switch { |
|
case wrk.CheckTCP != "": |
|
chk := &api.AgentServiceCheck{ |
|
Name: "up", |
|
TCP: wrk.CheckTCP, |
|
Interval: "5s", |
|
Timeout: "1s", |
|
} |
|
reg.Checks = append(reg.Checks, chk) |
|
case wrk.CheckHTTP != "": |
|
chk := &api.AgentServiceCheck{ |
|
Name: "up", |
|
HTTP: wrk.CheckHTTP, |
|
Method: "GET", |
|
Interval: "5s", |
|
Timeout: "1s", |
|
} |
|
reg.Checks = append(reg.Checks, chk) |
|
} |
|
|
|
// Switch token for every request. |
|
hdr := make(http.Header) |
|
hdr.Set("X-Consul-Token", s.secrets.ReadWorkloadToken(cluster.Name, wrk.ID)) |
|
agentClient.SetHeaders(hdr) |
|
|
|
RETRY: |
|
if err := agentClient.Agent().ServiceRegister(reg); err != nil { |
|
if isACLNotFound(err) { |
|
time.Sleep(50 * time.Millisecond) |
|
goto RETRY |
|
} |
|
return fmt.Errorf("failed to register workload %q to node %q: %w", wrk.ID, node.ID(), err) |
|
} |
|
|
|
logger.Debug("registered workload to client agent", |
|
"workload", wrk.ID.Name, |
|
"node", node.Name, |
|
"namespace", wrk.ID.Namespace, |
|
"partition", wrk.ID.Partition, |
|
) |
|
|
|
return nil |
|
} |
|
|
|
// syncWorkloadsForDataplaneInstances register/deregister services in the given cluster |
|
func (s *Sprawl) syncWorkloadsForDataplaneInstances(cluster *topology.Cluster) error { |
|
// registerWorkloadToNode is called when node is not disabled |
|
registerWorkloadToNode := func(node *topology.Node, wrk *topology.Workload) error { |
|
if err := s.registerCatalogServiceV1(cluster, node, wrk); err != nil { |
|
return fmt.Errorf("error registering service: %w", err) |
|
} |
|
if !wrk.DisableServiceMesh { |
|
if err := s.registerCatalogSidecarServiceV1(cluster, node, wrk); err != nil { |
|
return fmt.Errorf("error registering sidecar service: %w", err) |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
// deregisterWorkloadFromNode is called when node is disabled |
|
deregisterWorkloadFromNode := func(node *topology.Node, wrk *topology.Workload) error { |
|
if err := s.deregisterCatalogServiceV1(cluster, node, wrk); err != nil { |
|
return fmt.Errorf("error deregistering service: %w", err) |
|
} |
|
if !wrk.DisableServiceMesh { |
|
if err := s.deregisterCatalogSidecarServiceV1(cluster, node, wrk); err != nil { |
|
return fmt.Errorf("error deregistering sidecar service: %w", err) |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
var syncWorkload func(node *topology.Node, wrk *topology.Workload) error |
|
|
|
for _, node := range cluster.Nodes { |
|
if !node.RunsWorkloads() || len(node.Workloads) == 0 { |
|
continue |
|
} |
|
|
|
if !node.IsDataplane() { |
|
continue |
|
} |
|
|
|
// Register virtual node service first if node is not disabled |
|
if !node.Disabled { |
|
if err := s.registerCatalogNode(cluster, node); err != nil { |
|
return fmt.Errorf("error registering virtual node: %w", err) |
|
} |
|
} |
|
|
|
// Register/deregister services on the node |
|
for _, wrk := range node.Workloads { |
|
if !node.Disabled { |
|
syncWorkload = registerWorkloadToNode |
|
} else { |
|
syncWorkload = deregisterWorkloadFromNode |
|
} |
|
if err := syncWorkload(node, wrk); err != nil { |
|
return err |
|
} |
|
} |
|
|
|
// Deregister the virtual node if node is disabled |
|
if node.Disabled { |
|
if err := s.deregisterCatalogNode(cluster, node); err != nil { |
|
return fmt.Errorf("error deregistering virtual node: %w", err) |
|
} |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (s *Sprawl) registerCatalogNode( |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
) error { |
|
return s.registerCatalogNodeV1(cluster, node) |
|
} |
|
|
|
func (s *Sprawl) deregisterCatalogNode( |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
) error { |
|
return s.deregisterCatalogNodeV1(cluster, node) |
|
} |
|
|
|
func (s *Sprawl) writeResource(cluster *topology.Cluster, res *pbresource.Resource) (*pbresource.Resource, error) { |
|
var ( |
|
client = s.getResourceClient(cluster.Name) |
|
logger = s.logger.With("cluster", cluster.Name) |
|
) |
|
|
|
ctx := s.getManagementTokenContext(context.Background(), cluster.Name) |
|
RETRY: |
|
wrote, err := client.Write(ctx, &pbresource.WriteRequest{ |
|
Resource: res, |
|
}) |
|
if err != nil { |
|
if isACLNotFound(err) { // TODO: is this right for v2? |
|
time.Sleep(50 * time.Millisecond) |
|
goto RETRY |
|
} |
|
return nil, fmt.Errorf("error creating resource %s: %w", util.IDToString(res.Id), err) |
|
} |
|
|
|
logger.Info("resource upserted", "id", util.IDToString(res.Id)) |
|
return wrote.Resource, nil |
|
} |
|
|
|
func (s *Sprawl) registerCatalogNodeV1( |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
) error { |
|
if !node.IsDataplane() { |
|
panic("called wrong method type") |
|
} |
|
|
|
var ( |
|
client = s.clients[cluster.Name] |
|
logger = s.logger.With("cluster", cluster.Name) |
|
) |
|
|
|
reg := &api.CatalogRegistration{ |
|
Node: node.PodName(), |
|
Address: node.LocalAddress(), |
|
NodeMeta: map[string]string{ |
|
"dataplane-faux": "1", |
|
}, |
|
} |
|
if cluster.Enterprise { |
|
reg.Partition = node.Partition |
|
} |
|
|
|
// register synthetic node |
|
RETRY: |
|
if _, err := client.Catalog().Register(reg, nil); err != nil { |
|
if isACLNotFound(err) { |
|
time.Sleep(50 * time.Millisecond) |
|
goto RETRY |
|
} |
|
return fmt.Errorf("error registering virtual node %s: %w", node.ID(), err) |
|
} |
|
|
|
logger.Debug("virtual node created", |
|
"node", node.ID(), |
|
) |
|
|
|
return nil |
|
} |
|
|
|
func (s *Sprawl) deregisterCatalogNodeV1( |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
) error { |
|
if !node.IsDataplane() { |
|
panic("called wrong method type") |
|
} |
|
|
|
var ( |
|
client = s.clients[cluster.Name] |
|
logger = s.logger.With("cluster", cluster.Name) |
|
) |
|
|
|
dereg := &api.CatalogDeregistration{ |
|
Node: node.PodName(), |
|
Address: node.LocalAddress(), |
|
} |
|
if cluster.Enterprise { |
|
dereg.Partition = node.Partition |
|
} |
|
|
|
// deregister synthetic node |
|
RETRY: |
|
if _, err := client.Catalog().Deregister(dereg, nil); err != nil { |
|
if isACLNotFound(err) { |
|
time.Sleep(50 * time.Millisecond) |
|
goto RETRY |
|
} |
|
return fmt.Errorf("error deregistering virtual node %s: %w", node.ID(), err) |
|
} |
|
|
|
logger.Info("virtual node removed", |
|
"node", node.ID(), |
|
) |
|
|
|
return nil |
|
} |
|
|
|
func (s *Sprawl) deregisterCatalogServiceV1( |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
wrk *topology.Workload, |
|
) error { |
|
if !node.IsDataplane() { |
|
panic("called wrong method type") |
|
} |
|
|
|
var ( |
|
client = s.clients[cluster.Name] |
|
logger = s.logger.With("cluster", cluster.Name) |
|
) |
|
|
|
dereg := &api.CatalogDeregistration{ |
|
Node: node.PodName(), |
|
ServiceID: wrk.ID.Name, |
|
} |
|
RETRY: |
|
if _, err := client.Catalog().Deregister(dereg, nil); err != nil { |
|
if isACLNotFound(err) { |
|
time.Sleep(50 * time.Millisecond) |
|
goto RETRY |
|
} |
|
return fmt.Errorf("error deregistering service %s at node %s: %w", wrk.ID, node.ID(), err) |
|
} |
|
|
|
logger.Info("dataplane service removed", |
|
"service", wrk.ID, |
|
"node", node.ID(), |
|
) |
|
|
|
return nil |
|
} |
|
|
|
func (s *Sprawl) registerCatalogServiceV1( |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
wrk *topology.Workload, |
|
) error { |
|
if !node.IsDataplane() { |
|
panic("called wrong method type") |
|
} |
|
|
|
var ( |
|
client = s.clients[cluster.Name] |
|
logger = s.logger.With("cluster", cluster.Name) |
|
) |
|
|
|
reg := workloadToCatalogRegistration(cluster, node, wrk) |
|
|
|
RETRY: |
|
if _, err := client.Catalog().Register(reg, nil); err != nil { |
|
if isACLNotFound(err) { |
|
time.Sleep(50 * time.Millisecond) |
|
goto RETRY |
|
} |
|
return fmt.Errorf("error registering service %s to node %s: %w", wrk.ID, node.ID(), err) |
|
} |
|
|
|
logger.Debug("dataplane service created", |
|
"service", wrk.ID, |
|
"node", node.ID(), |
|
) |
|
|
|
return nil |
|
} |
|
|
|
func (s *Sprawl) deregisterCatalogSidecarServiceV1( |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
wrk *topology.Workload, |
|
) error { |
|
if !node.IsDataplane() { |
|
panic("called wrong method type") |
|
} |
|
if wrk.DisableServiceMesh { |
|
panic("not valid") |
|
} |
|
|
|
var ( |
|
client = s.clients[cluster.Name] |
|
logger = s.logger.With("cluster", cluster.Name) |
|
) |
|
|
|
pid := wrk.ID |
|
pid.Name += "-sidecar-proxy" |
|
dereg := &api.CatalogDeregistration{ |
|
Node: node.PodName(), |
|
ServiceID: pid.Name, |
|
} |
|
|
|
RETRY: |
|
if _, err := client.Catalog().Deregister(dereg, nil); err != nil { |
|
if isACLNotFound(err) { |
|
time.Sleep(50 * time.Millisecond) |
|
goto RETRY |
|
} |
|
return fmt.Errorf("error deregistering service %s to node %s: %w", wrk.ID, node.ID(), err) |
|
} |
|
|
|
logger.Info("dataplane sidecar service removed", |
|
"service", pid, |
|
"node", node.ID(), |
|
) |
|
|
|
return nil |
|
} |
|
|
|
func (s *Sprawl) registerCatalogSidecarServiceV1( |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
wrk *topology.Workload, |
|
) error { |
|
if !node.IsDataplane() { |
|
panic("called wrong method type") |
|
} |
|
if wrk.DisableServiceMesh { |
|
panic("not valid") |
|
} |
|
|
|
var ( |
|
client = s.clients[cluster.Name] |
|
logger = s.logger.With("cluster", cluster.Name) |
|
) |
|
|
|
pid, reg := workloadToSidecarCatalogRegistration(cluster, node, wrk) |
|
RETRY: |
|
if _, err := client.Catalog().Register(reg, nil); err != nil { |
|
if isACLNotFound(err) { |
|
time.Sleep(50 * time.Millisecond) |
|
goto RETRY |
|
} |
|
return fmt.Errorf("error registering service %s to node %s: %w", wrk.ID, node.ID(), err) |
|
} |
|
|
|
logger.Debug("dataplane sidecar service created", |
|
"service", pid, |
|
"node", node.ID(), |
|
) |
|
|
|
return nil |
|
} |
|
|
|
type Resource[V proto.Message] struct { |
|
Resource *pbresource.Resource |
|
Data V |
|
} |
|
|
|
func (r *Resource[V]) Build() (*pbresource.Resource, error) { |
|
anyData, err := anypb.New(r.Data) |
|
if err != nil { |
|
return nil, err |
|
} |
|
r.Resource.Data = anyData |
|
return r.Resource, nil |
|
} |
|
|
|
func workloadToCatalogRegistration( |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
wrk *topology.Workload, |
|
) *api.CatalogRegistration { |
|
reg := &api.CatalogRegistration{ |
|
Node: node.PodName(), |
|
SkipNodeUpdate: true, |
|
Service: &api.AgentService{ |
|
Kind: api.ServiceKindTypical, |
|
ID: wrk.ID.Name, |
|
Service: wrk.ID.Name, |
|
Meta: wrk.Meta, |
|
Port: wrk.Port, |
|
Address: node.LocalAddress(), |
|
}, |
|
} |
|
if wrk.IsMeshGateway { |
|
reg.Service.Kind = api.ServiceKindMeshGateway |
|
reg.Service.Proxy = &api.AgentServiceConnectProxyConfig{ |
|
Config: map[string]interface{}{ |
|
"envoy_gateway_no_default_bind": true, |
|
"envoy_gateway_bind_tagged_addresses": true, |
|
}, |
|
MeshGateway: api.MeshGatewayConfig{ |
|
Mode: api.MeshGatewayModeLocal, |
|
}, |
|
} |
|
} |
|
if node.HasPublicAddress() { |
|
reg.TaggedAddresses = map[string]string{ |
|
"lan": node.LocalAddress(), |
|
"lan_ipv4": node.LocalAddress(), |
|
"wan": node.PublicAddress(), |
|
"wan_ipv4": node.PublicAddress(), |
|
} |
|
// TODO: not sure what the difference is between these, but with just the |
|
// top-level set, it appeared to not get set in either :/ |
|
reg.Service.TaggedAddresses = map[string]api.ServiceAddress{ |
|
"lan": { |
|
Address: node.LocalAddress(), |
|
Port: wrk.Port, |
|
}, |
|
"lan_ipv4": { |
|
Address: node.LocalAddress(), |
|
Port: wrk.Port, |
|
}, |
|
"wan": { |
|
Address: node.PublicAddress(), |
|
Port: wrk.Port, |
|
}, |
|
"wan_ipv4": { |
|
Address: node.PublicAddress(), |
|
Port: wrk.Port, |
|
}, |
|
} |
|
} |
|
if cluster.Enterprise { |
|
reg.Partition = wrk.ID.Partition |
|
reg.Service.Namespace = wrk.ID.Namespace |
|
reg.Service.Partition = wrk.ID.Partition |
|
} |
|
|
|
if wrk.HasCheck() { |
|
chk := &api.HealthCheck{ |
|
Name: "external sync", |
|
// Type: "external-sync", |
|
Status: "passing", // TODO |
|
ServiceID: wrk.ID.Name, |
|
ServiceName: wrk.ID.Name, |
|
Output: "", |
|
} |
|
if cluster.Enterprise { |
|
chk.Namespace = wrk.ID.Namespace |
|
chk.Partition = wrk.ID.Partition |
|
} |
|
switch { |
|
case wrk.CheckTCP != "": |
|
chk.Definition.TCP = wrk.CheckTCP |
|
case wrk.CheckHTTP != "": |
|
chk.Definition.HTTP = wrk.CheckHTTP |
|
chk.Definition.Method = "GET" |
|
} |
|
reg.Checks = append(reg.Checks, chk) |
|
} |
|
return reg |
|
} |
|
|
|
func workloadToSidecarCatalogRegistration( |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
wrk *topology.Workload, |
|
) (topology.ID, *api.CatalogRegistration) { |
|
pid := wrk.ID |
|
pid.Name += "-sidecar-proxy" |
|
reg := &api.CatalogRegistration{ |
|
Node: node.PodName(), |
|
SkipNodeUpdate: true, |
|
Service: &api.AgentService{ |
|
Kind: api.ServiceKindConnectProxy, |
|
ID: pid.Name, |
|
Service: pid.Name, |
|
Meta: wrk.Meta, |
|
Port: wrk.EnvoyPublicListenerPort, |
|
Address: node.LocalAddress(), |
|
Proxy: &api.AgentServiceConnectProxyConfig{ |
|
DestinationServiceName: wrk.ID.Name, |
|
DestinationServiceID: wrk.ID.Name, |
|
LocalServicePort: wrk.Port, |
|
}, |
|
}, |
|
Checks: []*api.HealthCheck{{ |
|
Name: "external sync", |
|
// Type: "external-sync", |
|
Status: "passing", // TODO |
|
ServiceID: pid.Name, |
|
ServiceName: pid.Name, |
|
Definition: api.HealthCheckDefinition{ |
|
TCP: fmt.Sprintf("%s:%d", node.LocalAddress(), wrk.EnvoyPublicListenerPort), |
|
}, |
|
Output: "", |
|
}}, |
|
} |
|
if node.HasPublicAddress() { |
|
reg.TaggedAddresses = map[string]string{ |
|
"lan": node.LocalAddress(), |
|
"lan_ipv4": node.LocalAddress(), |
|
"wan": node.PublicAddress(), |
|
"wan_ipv4": node.PublicAddress(), |
|
} |
|
} |
|
if cluster.Enterprise { |
|
reg.Partition = pid.Partition |
|
reg.Service.Namespace = pid.Namespace |
|
reg.Service.Partition = pid.Partition |
|
reg.Checks[0].Namespace = pid.Namespace |
|
reg.Checks[0].Partition = pid.Partition |
|
} |
|
|
|
for _, us := range wrk.Upstreams { |
|
pu := api.Upstream{ |
|
DestinationName: us.ID.Name, |
|
DestinationPeer: us.Peer, |
|
LocalBindAddress: us.LocalAddress, |
|
LocalBindPort: us.LocalPort, |
|
} |
|
if cluster.Enterprise { |
|
pu.DestinationNamespace = us.ID.Namespace |
|
if us.Peer == "" { |
|
pu.DestinationPartition = us.ID.Partition |
|
} |
|
} |
|
reg.Service.Proxy.Upstreams = append(reg.Service.Proxy.Upstreams, pu) |
|
} |
|
|
|
return pid, reg |
|
}
|
|
|