mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
460 lines
11 KiB
460 lines
11 KiB
// Copyright (c) HashiCorp, Inc. |
|
// SPDX-License-Identifier: BUSL-1.1 |
|
|
|
package sprawl |
|
|
|
import ( |
|
"fmt" |
|
"net/http" |
|
"time" |
|
|
|
"github.com/hashicorp/consul/api" |
|
|
|
"github.com/hashicorp/consul/testing/deployer/topology" |
|
"github.com/hashicorp/consul/testing/deployer/util" |
|
) |
|
|
|
func (s *Sprawl) registerAllServicesToAgents() error { |
|
for _, cluster := range s.topology.Clusters { |
|
if err := s.registerServicesToAgents(cluster); err != nil { |
|
return fmt.Errorf("registerServicesToAgents[%s]: %w", cluster.Name, err) |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (s *Sprawl) registerAllServicesForDataplaneInstances() error { |
|
for _, cluster := range s.topology.Clusters { |
|
if err := s.registerServicesForDataplaneInstances(cluster); err != nil { |
|
return fmt.Errorf("registerServicesForDataplaneInstances[%s]: %w", cluster.Name, err) |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (s *Sprawl) registerServicesToAgents(cluster *topology.Cluster) error { |
|
for _, node := range cluster.Nodes { |
|
if !node.RunsWorkloads() || len(node.Services) == 0 || node.Disabled { |
|
continue |
|
} |
|
|
|
if !node.IsAgent() { |
|
continue |
|
} |
|
|
|
agentClient, err := util.ProxyAPIClient( |
|
node.LocalProxyPort(), |
|
node.LocalAddress(), |
|
8500, |
|
"", /*token will be in request*/ |
|
) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
for _, svc := range node.Services { |
|
if err := s.registerAgentService(agentClient, cluster, node, svc); err != nil { |
|
return err |
|
} |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (s *Sprawl) registerAgentService( |
|
agentClient *api.Client, |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
svc *topology.Service, |
|
) error { |
|
if !node.IsAgent() { |
|
panic("called wrong method type") |
|
} |
|
|
|
if svc.IsMeshGateway { |
|
return nil // handled at startup time for agent-full, but won't be for agent-less |
|
} |
|
|
|
var ( |
|
logger = s.logger.With("cluster", cluster.Name) |
|
) |
|
|
|
reg := &api.AgentServiceRegistration{ |
|
ID: svc.ID.Name, |
|
Name: svc.ID.Name, |
|
Port: svc.Port, |
|
Meta: svc.Meta, |
|
} |
|
if cluster.Enterprise { |
|
reg.Namespace = svc.ID.Namespace |
|
reg.Partition = svc.ID.Partition |
|
} |
|
|
|
if !svc.DisableServiceMesh { |
|
var upstreams []api.Upstream |
|
for _, u := range svc.Upstreams { |
|
uAPI := api.Upstream{ |
|
DestinationPeer: u.Peer, |
|
DestinationName: u.ID.Name, |
|
LocalBindAddress: u.LocalAddress, |
|
LocalBindPort: u.LocalPort, |
|
// Config map[string]interface{} `json:",omitempty" bexpr:"-"` |
|
// MeshGateway MeshGatewayConfig `json:",omitempty"` |
|
} |
|
if cluster.Enterprise { |
|
uAPI.DestinationNamespace = u.ID.Namespace |
|
if u.Peer == "" { |
|
uAPI.DestinationPartition = u.ID.Partition |
|
} |
|
} |
|
upstreams = append(upstreams, uAPI) |
|
} |
|
reg.Connect = &api.AgentServiceConnect{ |
|
SidecarService: &api.AgentServiceRegistration{ |
|
Proxy: &api.AgentServiceConnectProxyConfig{ |
|
Upstreams: upstreams, |
|
}, |
|
}, |
|
} |
|
} |
|
|
|
switch { |
|
case svc.CheckTCP != "": |
|
chk := &api.AgentServiceCheck{ |
|
Name: "up", |
|
TCP: svc.CheckTCP, |
|
Interval: "5s", |
|
Timeout: "1s", |
|
} |
|
reg.Checks = append(reg.Checks, chk) |
|
case svc.CheckHTTP != "": |
|
chk := &api.AgentServiceCheck{ |
|
Name: "up", |
|
HTTP: svc.CheckHTTP, |
|
Method: "GET", |
|
Interval: "5s", |
|
Timeout: "1s", |
|
} |
|
reg.Checks = append(reg.Checks, chk) |
|
} |
|
|
|
// Switch token for every request. |
|
hdr := make(http.Header) |
|
hdr.Set("X-Consul-Token", s.secrets.ReadServiceToken(cluster.Name, svc.ID)) |
|
agentClient.SetHeaders(hdr) |
|
|
|
RETRY: |
|
if err := agentClient.Agent().ServiceRegister(reg); err != nil { |
|
if isACLNotFound(err) { |
|
time.Sleep(50 * time.Millisecond) |
|
goto RETRY |
|
} |
|
return fmt.Errorf("failed to register service %q to node %q: %w", svc.ID, node.ID(), err) |
|
} |
|
|
|
logger.Info("registered service to client agent", |
|
"service", svc.ID.Name, |
|
"node", node.Name, |
|
"namespace", svc.ID.Namespace, |
|
"partition", svc.ID.Partition, |
|
) |
|
|
|
return nil |
|
} |
|
|
|
func (s *Sprawl) registerServicesForDataplaneInstances(cluster *topology.Cluster) error { |
|
for _, node := range cluster.Nodes { |
|
if !node.RunsWorkloads() || len(node.Services) == 0 || node.Disabled { |
|
continue |
|
} |
|
|
|
if !node.IsDataplane() { |
|
continue |
|
} |
|
|
|
if err := s.registerCatalogNode(cluster, node); err != nil { |
|
return fmt.Errorf("error registering virtual node: %w", err) |
|
} |
|
|
|
for _, svc := range node.Services { |
|
if err := s.registerCatalogService(cluster, node, svc); err != nil { |
|
return fmt.Errorf("error registering service: %w", err) |
|
} |
|
if !svc.DisableServiceMesh { |
|
if err := s.registerCatalogSidecarService(cluster, node, svc); err != nil { |
|
return fmt.Errorf("error registering sidecar service: %w", err) |
|
} |
|
} |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (s *Sprawl) registerCatalogNode( |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
) error { |
|
if !node.IsDataplane() { |
|
panic("called wrong method type") |
|
} |
|
|
|
var ( |
|
client = s.clients[cluster.Name] |
|
logger = s.logger.With("cluster", cluster.Name) |
|
) |
|
|
|
reg := &api.CatalogRegistration{ |
|
Node: node.PodName(), |
|
Address: node.LocalAddress(), |
|
NodeMeta: map[string]string{ |
|
"dataplane-faux": "1", |
|
}, |
|
} |
|
if cluster.Enterprise { |
|
reg.Partition = node.Partition |
|
} |
|
|
|
// register synthetic node |
|
RETRY: |
|
if _, err := client.Catalog().Register(reg, nil); err != nil { |
|
if isACLNotFound(err) { |
|
time.Sleep(50 * time.Millisecond) |
|
goto RETRY |
|
} |
|
return fmt.Errorf("error registering virtual node %s: %w", node.ID(), err) |
|
} |
|
|
|
logger.Info("virtual node created", |
|
"node", node.ID(), |
|
) |
|
|
|
return nil |
|
} |
|
|
|
func (s *Sprawl) registerCatalogService( |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
svc *topology.Service, |
|
) error { |
|
if !node.IsDataplane() { |
|
panic("called wrong method type") |
|
} |
|
|
|
var ( |
|
client = s.clients[cluster.Name] |
|
logger = s.logger.With("cluster", cluster.Name) |
|
) |
|
|
|
reg := serviceToCatalogRegistration(cluster, node, svc) |
|
|
|
RETRY: |
|
if _, err := client.Catalog().Register(reg, nil); err != nil { |
|
if isACLNotFound(err) { |
|
time.Sleep(50 * time.Millisecond) |
|
goto RETRY |
|
} |
|
return fmt.Errorf("error registering service %s to node %s: %w", svc.ID, node.ID(), err) |
|
} |
|
|
|
logger.Info("dataplane service created", |
|
"service", svc.ID, |
|
"node", node.ID(), |
|
) |
|
|
|
return nil |
|
} |
|
|
|
func (s *Sprawl) registerCatalogSidecarService( |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
svc *topology.Service, |
|
) error { |
|
if !node.IsDataplane() { |
|
panic("called wrong method type") |
|
} |
|
if svc.DisableServiceMesh { |
|
panic("not valid") |
|
} |
|
|
|
var ( |
|
client = s.clients[cluster.Name] |
|
logger = s.logger.With("cluster", cluster.Name) |
|
) |
|
|
|
pid, reg := serviceToSidecarCatalogRegistration(cluster, node, svc) |
|
RETRY: |
|
if _, err := client.Catalog().Register(reg, nil); err != nil { |
|
if isACLNotFound(err) { |
|
time.Sleep(50 * time.Millisecond) |
|
goto RETRY |
|
} |
|
return fmt.Errorf("error registering service %s to node %s: %w", svc.ID, node.ID(), err) |
|
} |
|
|
|
logger.Info("dataplane sidecar service created", |
|
"service", pid, |
|
"node", node.ID(), |
|
) |
|
|
|
return nil |
|
} |
|
|
|
func serviceToCatalogRegistration( |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
svc *topology.Service, |
|
) *api.CatalogRegistration { |
|
reg := &api.CatalogRegistration{ |
|
Node: node.PodName(), |
|
SkipNodeUpdate: true, |
|
Service: &api.AgentService{ |
|
Kind: api.ServiceKindTypical, |
|
ID: svc.ID.Name, |
|
Service: svc.ID.Name, |
|
Meta: svc.Meta, |
|
Port: svc.Port, |
|
Address: node.LocalAddress(), |
|
}, |
|
} |
|
if svc.IsMeshGateway { |
|
reg.Service.Kind = api.ServiceKindMeshGateway |
|
reg.Service.Proxy = &api.AgentServiceConnectProxyConfig{ |
|
Config: map[string]interface{}{ |
|
"envoy_gateway_no_default_bind": true, |
|
"envoy_gateway_bind_tagged_addresses": true, |
|
}, |
|
MeshGateway: api.MeshGatewayConfig{ |
|
Mode: api.MeshGatewayModeLocal, |
|
}, |
|
} |
|
} |
|
if node.HasPublicAddress() { |
|
reg.TaggedAddresses = map[string]string{ |
|
"lan": node.LocalAddress(), |
|
"lan_ipv4": node.LocalAddress(), |
|
"wan": node.PublicAddress(), |
|
"wan_ipv4": node.PublicAddress(), |
|
} |
|
// TODO: not sure what the difference is between these, but with just the |
|
// top-level set, it appeared to not get set in either :/ |
|
reg.Service.TaggedAddresses = map[string]api.ServiceAddress{ |
|
"lan": { |
|
Address: node.LocalAddress(), |
|
Port: svc.Port, |
|
}, |
|
"lan_ipv4": { |
|
Address: node.LocalAddress(), |
|
Port: svc.Port, |
|
}, |
|
"wan": { |
|
Address: node.PublicAddress(), |
|
Port: svc.Port, |
|
}, |
|
"wan_ipv4": { |
|
Address: node.PublicAddress(), |
|
Port: svc.Port, |
|
}, |
|
} |
|
} |
|
if cluster.Enterprise { |
|
reg.Partition = svc.ID.Partition |
|
reg.Service.Namespace = svc.ID.Namespace |
|
reg.Service.Partition = svc.ID.Partition |
|
} |
|
|
|
if svc.HasCheck() { |
|
chk := &api.HealthCheck{ |
|
Name: "external sync", |
|
// Type: "external-sync", |
|
Status: "passing", // TODO |
|
ServiceID: svc.ID.Name, |
|
ServiceName: svc.ID.Name, |
|
Output: "", |
|
} |
|
if cluster.Enterprise { |
|
chk.Namespace = svc.ID.Namespace |
|
chk.Partition = svc.ID.Partition |
|
} |
|
switch { |
|
case svc.CheckTCP != "": |
|
chk.Definition.TCP = svc.CheckTCP |
|
case svc.CheckHTTP != "": |
|
chk.Definition.HTTP = svc.CheckHTTP |
|
chk.Definition.Method = "GET" |
|
} |
|
reg.Checks = append(reg.Checks, chk) |
|
} |
|
return reg |
|
} |
|
|
|
func serviceToSidecarCatalogRegistration( |
|
cluster *topology.Cluster, |
|
node *topology.Node, |
|
svc *topology.Service, |
|
) (topology.ServiceID, *api.CatalogRegistration) { |
|
pid := svc.ID |
|
pid.Name += "-sidecar-proxy" |
|
reg := &api.CatalogRegistration{ |
|
Node: node.PodName(), |
|
SkipNodeUpdate: true, |
|
Service: &api.AgentService{ |
|
Kind: api.ServiceKindConnectProxy, |
|
ID: pid.Name, |
|
Service: pid.Name, |
|
Meta: svc.Meta, |
|
Port: svc.EnvoyPublicListenerPort, |
|
Address: node.LocalAddress(), |
|
Proxy: &api.AgentServiceConnectProxyConfig{ |
|
DestinationServiceName: svc.ID.Name, |
|
DestinationServiceID: svc.ID.Name, |
|
LocalServicePort: svc.Port, |
|
}, |
|
}, |
|
Checks: []*api.HealthCheck{{ |
|
Name: "external sync", |
|
// Type: "external-sync", |
|
Status: "passing", // TODO |
|
ServiceID: pid.Name, |
|
ServiceName: pid.Name, |
|
Definition: api.HealthCheckDefinition{ |
|
TCP: fmt.Sprintf("%s:%d", node.LocalAddress(), svc.EnvoyPublicListenerPort), |
|
}, |
|
Output: "", |
|
}}, |
|
} |
|
if node.HasPublicAddress() { |
|
reg.TaggedAddresses = map[string]string{ |
|
"lan": node.LocalAddress(), |
|
"lan_ipv4": node.LocalAddress(), |
|
"wan": node.PublicAddress(), |
|
"wan_ipv4": node.PublicAddress(), |
|
} |
|
} |
|
if cluster.Enterprise { |
|
reg.Partition = pid.Partition |
|
reg.Service.Namespace = pid.Namespace |
|
reg.Service.Partition = pid.Partition |
|
reg.Checks[0].Namespace = pid.Namespace |
|
reg.Checks[0].Partition = pid.Partition |
|
} |
|
|
|
for _, u := range svc.Upstreams { |
|
pu := api.Upstream{ |
|
DestinationName: u.ID.Name, |
|
DestinationPeer: u.Peer, |
|
LocalBindAddress: u.LocalAddress, |
|
LocalBindPort: u.LocalPort, |
|
} |
|
if cluster.Enterprise { |
|
pu.DestinationNamespace = u.ID.Namespace |
|
if u.Peer == "" { |
|
pu.DestinationPartition = u.ID.Partition |
|
} |
|
} |
|
reg.Service.Proxy.Upstreams = append(reg.Service.Proxy.Upstreams, pu) |
|
} |
|
|
|
return pid, reg |
|
}
|
|
|