|
|
|
@ -21,6 +21,7 @@ import (
|
|
|
|
|
"github.com/hashicorp/consul/testing/deployer/util"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// registerAllServicesToAgents registers services in agent-ful mode
|
|
|
|
|
func (s *Sprawl) registerAllServicesToAgents() error {
|
|
|
|
|
for _, cluster := range s.topology.Clusters {
|
|
|
|
|
if err := s.registerServicesToAgents(cluster); err != nil {
|
|
|
|
@ -30,10 +31,10 @@ func (s *Sprawl) registerAllServicesToAgents() error {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Sprawl) registerAllServicesForDataplaneInstances() error {
|
|
|
|
|
func (s *Sprawl) syncAllServicesForDataplaneInstances() error {
|
|
|
|
|
for _, cluster := range s.topology.Clusters {
|
|
|
|
|
if err := s.registerServicesForDataplaneInstances(cluster); err != nil {
|
|
|
|
|
return fmt.Errorf("registerServicesForDataplaneInstances[%s]: %w", cluster.Name, err)
|
|
|
|
|
if err := s.syncServicesForDataplaneInstances(cluster); err != nil {
|
|
|
|
|
return fmt.Errorf("syncServicesForDataplaneInstances[%s]: %w", cluster.Name, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
@ -86,7 +87,7 @@ func (s *Sprawl) registerAgentService(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if svc.IsMeshGateway {
|
|
|
|
|
return nil // handled at startup time for agent-full, but won't be for agent-less
|
|
|
|
|
return nil // handled at startup time for agent-ful, but won't be for agent-less
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
@ -176,94 +177,137 @@ RETRY:
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Sprawl) registerServicesForDataplaneInstances(cluster *topology.Cluster) error {
|
|
|
|
|
// syncServicesForDataplaneInstances register/deregister services in the given cluster
|
|
|
|
|
func (s *Sprawl) syncServicesForDataplaneInstances(cluster *topology.Cluster) error {
|
|
|
|
|
identityInfo := make(map[topology.ServiceID]*Resource[*pbauth.WorkloadIdentity])
|
|
|
|
|
|
|
|
|
|
for _, node := range cluster.Nodes {
|
|
|
|
|
if !node.RunsWorkloads() || len(node.Services) == 0 || node.Disabled {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !node.IsDataplane() {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := s.registerCatalogNode(cluster, node); err != nil {
|
|
|
|
|
return fmt.Errorf("error registering virtual node: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, svc := range node.Services {
|
|
|
|
|
if node.IsV2() {
|
|
|
|
|
pending := serviceInstanceToResources(node, svc)
|
|
|
|
|
// registerServiceAtNode is called when node is not disabled
|
|
|
|
|
registerServiceAtNode := func(node *topology.Node, svc *topology.Service) error {
|
|
|
|
|
if node.IsV2() {
|
|
|
|
|
pending := serviceInstanceToResources(node, svc)
|
|
|
|
|
|
|
|
|
|
workloadID := topology.NewServiceID(svc.WorkloadIdentity, svc.ID.Namespace, svc.ID.Partition)
|
|
|
|
|
if _, ok := identityInfo[workloadID]; !ok {
|
|
|
|
|
identityInfo[workloadID] = pending.WorkloadIdentity
|
|
|
|
|
}
|
|
|
|
|
workloadID := topology.NewServiceID(svc.WorkloadIdentity, svc.ID.Namespace, svc.ID.Partition)
|
|
|
|
|
if _, ok := identityInfo[workloadID]; !ok {
|
|
|
|
|
identityInfo[workloadID] = pending.WorkloadIdentity
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Write workload
|
|
|
|
|
res, err := pending.Workload.Build()
|
|
|
|
|
// Write workload
|
|
|
|
|
res, err := pending.Workload.Build()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.Workload.Resource.Id), err)
|
|
|
|
|
}
|
|
|
|
|
workload, err := s.writeResource(cluster, res)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
// Write check linked to workload
|
|
|
|
|
for _, check := range pending.HealthStatuses {
|
|
|
|
|
check.Resource.Owner = workload.Id
|
|
|
|
|
res, err := check.Build()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.Workload.Resource.Id), err)
|
|
|
|
|
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(check.Resource.Id), err)
|
|
|
|
|
}
|
|
|
|
|
workload, err := s.writeResource(cluster, res)
|
|
|
|
|
if err != nil {
|
|
|
|
|
if _, err := s.writeResource(cluster, res); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
// Write check linked to workload
|
|
|
|
|
for _, check := range pending.HealthStatuses {
|
|
|
|
|
check.Resource.Owner = workload.Id
|
|
|
|
|
res, err := check.Build()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(check.Resource.Id), err)
|
|
|
|
|
}
|
|
|
|
|
if _, err := s.writeResource(cluster, res); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// maybe write destinations
|
|
|
|
|
if pending.Destinations != nil {
|
|
|
|
|
res, err := pending.Destinations.Build()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.Destinations.Resource.Id), err)
|
|
|
|
|
}
|
|
|
|
|
// maybe write destinations
|
|
|
|
|
if pending.Destinations != nil {
|
|
|
|
|
res, err := pending.Destinations.Build()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.Destinations.Resource.Id), err)
|
|
|
|
|
}
|
|
|
|
|
if _, err := s.writeResource(cluster, res); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if _, err := s.writeResource(cluster, res); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if pending.ProxyConfiguration != nil {
|
|
|
|
|
res, err := pending.ProxyConfiguration.Build()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.ProxyConfiguration.Resource.Id), err)
|
|
|
|
|
}
|
|
|
|
|
if _, err := s.writeResource(cluster, res); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if pending.ProxyConfiguration != nil {
|
|
|
|
|
res, err := pending.ProxyConfiguration.Build()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(pending.ProxyConfiguration.Resource.Id), err)
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if err := s.registerCatalogServiceV1(cluster, node, svc); err != nil {
|
|
|
|
|
return fmt.Errorf("error registering service: %w", err)
|
|
|
|
|
if _, err := s.writeResource(cluster, res); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if !svc.DisableServiceMesh {
|
|
|
|
|
if err := s.registerCatalogSidecarServiceV1(cluster, node, svc); err != nil {
|
|
|
|
|
return fmt.Errorf("error registering sidecar service: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if err := s.registerCatalogServiceV1(cluster, node, svc); err != nil {
|
|
|
|
|
return fmt.Errorf("error registering service: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if !svc.DisableServiceMesh {
|
|
|
|
|
if err := s.registerCatalogSidecarServiceV1(cluster, node, svc); err != nil {
|
|
|
|
|
return fmt.Errorf("error registering sidecar service: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, identity := range identityInfo {
|
|
|
|
|
res, err := identity.Build()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(identity.Resource.Id), err)
|
|
|
|
|
// deregisterServiceAtNode is called when node is disabled
|
|
|
|
|
deregisterServiceAtNode := func(node *topology.Node, svc *topology.Service) error {
|
|
|
|
|
if node.IsV2() {
|
|
|
|
|
// TODO: implement deregister services for v2
|
|
|
|
|
panic("deregister services is not implemented for V2")
|
|
|
|
|
} else {
|
|
|
|
|
if err := s.deregisterCatalogServiceV1(cluster, node, svc); err != nil {
|
|
|
|
|
return fmt.Errorf("error deregistering service: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if !svc.DisableServiceMesh {
|
|
|
|
|
if err := s.deregisterCatalogSidecarServiceV1(cluster, node, svc); err != nil {
|
|
|
|
|
return fmt.Errorf("error deregistering sidecar service: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if _, err := s.writeResource(cluster, res); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var syncService func(node *topology.Node, svc *topology.Service) error
|
|
|
|
|
|
|
|
|
|
for _, node := range cluster.Nodes {
|
|
|
|
|
if !node.RunsWorkloads() || len(node.Services) == 0 {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !node.IsDataplane() {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Register virtual node service first if node is not disabled
|
|
|
|
|
if !node.Disabled {
|
|
|
|
|
if err := s.registerCatalogNode(cluster, node); err != nil {
|
|
|
|
|
return fmt.Errorf("error registering virtual node: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Register/deregister services on the node
|
|
|
|
|
for _, svc := range node.Services {
|
|
|
|
|
if !node.Disabled {
|
|
|
|
|
syncService = registerServiceAtNode
|
|
|
|
|
} else {
|
|
|
|
|
syncService = deregisterServiceAtNode
|
|
|
|
|
}
|
|
|
|
|
syncService(node, svc)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Deregister the virtual node if node is disabled
|
|
|
|
|
if node.Disabled {
|
|
|
|
|
if err := s.deregisterCatalogNode(cluster, node); err != nil {
|
|
|
|
|
return fmt.Errorf("error deregistering virtual node: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if cluster.EnableV2 {
|
|
|
|
|
for _, identity := range identityInfo {
|
|
|
|
|
res, err := identity.Build()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error serializing resource %s: %w", util.IDToString(identity.Resource.Id), err)
|
|
|
|
|
}
|
|
|
|
|
if _, err := s.writeResource(cluster, res); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for id, svcData := range cluster.Services {
|
|
|
|
|
svcInfo := &Resource[*pbcatalog.Service]{
|
|
|
|
|
Resource: &pbresource.Resource{
|
|
|
|
@ -311,6 +355,16 @@ func (s *Sprawl) registerCatalogNode(
|
|
|
|
|
return s.registerCatalogNodeV1(cluster, node)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Sprawl) deregisterCatalogNode(
|
|
|
|
|
cluster *topology.Cluster,
|
|
|
|
|
node *topology.Node,
|
|
|
|
|
) error {
|
|
|
|
|
if node.IsV2() {
|
|
|
|
|
panic("deregister V2 node is not implemented")
|
|
|
|
|
}
|
|
|
|
|
return s.deregisterCatalogNodeV1(cluster, node)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Sprawl) registerCatalogNodeV2(
|
|
|
|
|
cluster *topology.Cluster,
|
|
|
|
|
node *topology.Node,
|
|
|
|
@ -413,6 +467,82 @@ RETRY:
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Sprawl) deregisterCatalogNodeV1(
|
|
|
|
|
cluster *topology.Cluster,
|
|
|
|
|
node *topology.Node,
|
|
|
|
|
) error {
|
|
|
|
|
if !node.IsDataplane() {
|
|
|
|
|
panic("called wrong method type")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
client = s.clients[cluster.Name]
|
|
|
|
|
logger = s.logger.With("cluster", cluster.Name)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
dereg := &api.CatalogDeregistration{
|
|
|
|
|
Node: node.PodName(),
|
|
|
|
|
Address: node.LocalAddress(),
|
|
|
|
|
}
|
|
|
|
|
if cluster.Enterprise {
|
|
|
|
|
dereg.Partition = node.Partition
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// deregister synthetic node
|
|
|
|
|
RETRY:
|
|
|
|
|
if _, err := client.Catalog().Deregister(dereg, nil); err != nil {
|
|
|
|
|
if isACLNotFound(err) {
|
|
|
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
|
goto RETRY
|
|
|
|
|
}
|
|
|
|
|
return fmt.Errorf("error deregistering virtual node %s: %w", node.ID(), err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.Info("virtual node removed",
|
|
|
|
|
"node", node.ID(),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Sprawl) deregisterCatalogServiceV1(
|
|
|
|
|
cluster *topology.Cluster,
|
|
|
|
|
node *topology.Node,
|
|
|
|
|
svc *topology.Service,
|
|
|
|
|
) error {
|
|
|
|
|
if !node.IsDataplane() {
|
|
|
|
|
panic("called wrong method type")
|
|
|
|
|
}
|
|
|
|
|
if node.IsV2() {
|
|
|
|
|
panic("don't call this")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
client = s.clients[cluster.Name]
|
|
|
|
|
logger = s.logger.With("cluster", cluster.Name)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
dereg := &api.CatalogDeregistration{
|
|
|
|
|
Node: node.PodName(),
|
|
|
|
|
ServiceID: svc.ID.Name,
|
|
|
|
|
}
|
|
|
|
|
RETRY:
|
|
|
|
|
if _, err := client.Catalog().Deregister(dereg, nil); err != nil {
|
|
|
|
|
if isACLNotFound(err) {
|
|
|
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
|
goto RETRY
|
|
|
|
|
}
|
|
|
|
|
return fmt.Errorf("error deregistering service %s at node %s: %w", svc.ID, node.ID(), err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.Info("dataplane service removed",
|
|
|
|
|
"service", svc.ID,
|
|
|
|
|
"node", node.ID(),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Sprawl) registerCatalogServiceV1(
|
|
|
|
|
cluster *topology.Cluster,
|
|
|
|
|
node *topology.Node,
|
|
|
|
@ -449,6 +579,50 @@ RETRY:
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Sprawl) deregisterCatalogSidecarServiceV1(
|
|
|
|
|
cluster *topology.Cluster,
|
|
|
|
|
node *topology.Node,
|
|
|
|
|
svc *topology.Service,
|
|
|
|
|
) error {
|
|
|
|
|
if !node.IsDataplane() {
|
|
|
|
|
panic("called wrong method type")
|
|
|
|
|
}
|
|
|
|
|
if svc.DisableServiceMesh {
|
|
|
|
|
panic("not valid")
|
|
|
|
|
}
|
|
|
|
|
if node.IsV2() {
|
|
|
|
|
panic("don't call this")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
client = s.clients[cluster.Name]
|
|
|
|
|
logger = s.logger.With("cluster", cluster.Name)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
pid := svc.ID
|
|
|
|
|
pid.Name += "-sidecar-proxy"
|
|
|
|
|
dereg := &api.CatalogDeregistration{
|
|
|
|
|
Node: node.PodName(),
|
|
|
|
|
ServiceID: pid.Name,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RETRY:
|
|
|
|
|
if _, err := client.Catalog().Deregister(dereg, nil); err != nil {
|
|
|
|
|
if isACLNotFound(err) {
|
|
|
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
|
goto RETRY
|
|
|
|
|
}
|
|
|
|
|
return fmt.Errorf("error deregistering service %s to node %s: %w", svc.ID, node.ID(), err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.Info("dataplane sidecar service removed",
|
|
|
|
|
"service", pid,
|
|
|
|
|
"node", node.ID(),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Sprawl) registerCatalogSidecarServiceV1(
|
|
|
|
|
cluster *topology.Cluster,
|
|
|
|
|
node *topology.Node,
|
|
|
|
|