mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
378 lines
10 KiB
378 lines
10 KiB
// Copyright (c) HashiCorp, Inc. |
|
// SPDX-License-Identifier: BUSL-1.1 |
|
|
|
package pbpeering |
|
|
|
import ( |
|
"crypto/tls" |
|
"crypto/x509" |
|
"errors" |
|
"fmt" |
|
"time" |
|
|
|
"google.golang.org/grpc" |
|
"google.golang.org/grpc/credentials" |
|
"google.golang.org/protobuf/proto" |
|
"google.golang.org/protobuf/types/known/timestamppb" |
|
|
|
"github.com/hashicorp/consul/agent/structs" |
|
"github.com/hashicorp/consul/api" |
|
"github.com/hashicorp/consul/lib" |
|
"github.com/hashicorp/consul/proto/private/pbcommon" |
|
) |
|
|
|
// RequestDatacenter implements structs.RPCInfo |
|
func (req *GenerateTokenRequest) RequestDatacenter() string { |
|
// Cross-datacenter requests are not allowed for peering actions because |
|
// they rely on WAN-federation. |
|
return "" |
|
} |
|
|
|
// RequestDatacenter implements structs.RPCInfo |
|
func (req *EstablishRequest) RequestDatacenter() string { |
|
// Cross-datacenter requests are not allowed for peering actions because |
|
// they rely on WAN-federation. |
|
return "" |
|
} |
|
|
|
// RequestDatacenter implements structs.RPCInfo |
|
func (req *PeeringReadRequest) RequestDatacenter() string { |
|
// Cross-datacenter requests are not allowed for peering actions because |
|
// they rely on WAN-federation. |
|
return "" |
|
} |
|
|
|
// RequestDatacenter implements structs.RPCInfo |
|
func (req *PeeringListRequest) RequestDatacenter() string { |
|
// Cross-datacenter requests are not allowed for peering actions because |
|
// they rely on WAN-federation. |
|
return "" |
|
} |
|
|
|
// RequestDatacenter implements structs.RPCInfo |
|
func (req *PeeringWriteRequest) RequestDatacenter() string { |
|
// Cross-datacenter requests are not allowed for peering actions because |
|
// they rely on WAN-federation. |
|
return "" |
|
} |
|
|
|
// RequestDatacenter implements structs.RPCInfo |
|
func (req *PeeringDeleteRequest) RequestDatacenter() string { |
|
// Cross-datacenter requests are not allowed for peering actions because |
|
// they rely on WAN-federation. |
|
return "" |
|
} |
|
|
|
// RequestDatacenter implements structs.RPCInfo |
|
func (req *TrustBundleReadRequest) RequestDatacenter() string { |
|
// Cross-datacenter requests are not allowed for peering actions because |
|
// they rely on WAN-federation. |
|
return "" |
|
} |
|
|
|
// RequestDatacenter implements structs.RPCInfo |
|
func (req *TrustBundleListByServiceRequest) RequestDatacenter() string { |
|
// Cross-datacenter requests are not allowed for peering actions because |
|
// they rely on WAN-federation. |
|
return "" |
|
} |
|
|
|
// ShouldDial returns true when the peering was stored via the peering initiation endpoint, |
|
// AND the peering is not marked as terminated by our peer. |
|
// If we generated a token for this peer we did not store our server addresses under PeerServerAddresses or ManualServerAddresses. |
|
// These server addresses are for dialing, and only the peer initiating the peering will do the dialing. |
|
func (p *Peering) ShouldDial() bool { |
|
return len(p.PeerServerAddresses) > 0 || len(p.ManualServerAddresses) > 0 |
|
} |
|
|
|
// GetAddressesToDial returns the listing of addresses that should be dialed for the peering. |
|
// It will ensure that manual addresses take precedence, if any are defined. |
|
func (p *Peering) GetAddressesToDial() []string { |
|
if len(p.ManualServerAddresses) > 0 { |
|
return p.ManualServerAddresses |
|
} |
|
return p.PeerServerAddresses |
|
} |
|
|
|
func (x PeeringState) GoString() string { |
|
return x.String() |
|
} |
|
|
|
// ConcatenatedRootPEMs concatenates and returns all PEM-encoded public certificates |
|
// in a peer's trust bundle. |
|
func (b *PeeringTrustBundle) ConcatenatedRootPEMs() string { |
|
if b == nil { |
|
return "" |
|
} |
|
|
|
var rootPEMs string |
|
for _, pem := range b.RootPEMs { |
|
rootPEMs += lib.EnsureTrailingNewline(pem) |
|
} |
|
return rootPEMs |
|
} |
|
|
|
// enumcover:PeeringState |
|
func PeeringStateToAPI(s PeeringState) api.PeeringState { |
|
switch s { |
|
case PeeringState_PENDING: |
|
return api.PeeringStatePending |
|
case PeeringState_ESTABLISHING: |
|
return api.PeeringStateEstablishing |
|
case PeeringState_ACTIVE: |
|
return api.PeeringStateActive |
|
case PeeringState_FAILING: |
|
return api.PeeringStateFailing |
|
case PeeringState_DELETING: |
|
return api.PeeringStateDeleting |
|
case PeeringState_TERMINATED: |
|
return api.PeeringStateTerminated |
|
case PeeringState_UNDEFINED: |
|
fallthrough |
|
default: |
|
return api.PeeringStateUndefined |
|
} |
|
} |
|
|
|
// enumcover:api.PeeringState |
|
func PeeringStateFromAPI(t api.PeeringState) PeeringState { |
|
switch t { |
|
case api.PeeringStatePending: |
|
return PeeringState_PENDING |
|
case api.PeeringStateEstablishing: |
|
return PeeringState_ESTABLISHING |
|
case api.PeeringStateActive: |
|
return PeeringState_ACTIVE |
|
case api.PeeringStateFailing: |
|
return PeeringState_FAILING |
|
case api.PeeringStateDeleting: |
|
return PeeringState_DELETING |
|
case api.PeeringStateTerminated: |
|
return PeeringState_TERMINATED |
|
case api.PeeringStateUndefined: |
|
fallthrough |
|
default: |
|
return PeeringState_UNDEFINED |
|
} |
|
} |
|
|
|
func StreamStatusToAPI(status *StreamStatus) api.PeeringStreamStatus { |
|
return api.PeeringStreamStatus{ |
|
ImportedServices: status.ImportedServices, |
|
ExportedServices: status.ExportedServices, |
|
LastHeartbeat: TimePtrFromProto(status.LastHeartbeat), |
|
LastReceive: TimePtrFromProto(status.LastReceive), |
|
LastSend: TimePtrFromProto(status.LastSend), |
|
} |
|
} |
|
|
|
func StreamStatusFromAPI(status api.PeeringStreamStatus) *StreamStatus { |
|
return &StreamStatus{ |
|
ImportedServices: status.ImportedServices, |
|
ExportedServices: status.ExportedServices, |
|
LastHeartbeat: TimePtrToProto(status.LastHeartbeat), |
|
LastReceive: TimePtrToProto(status.LastReceive), |
|
LastSend: TimePtrToProto(status.LastSend), |
|
} |
|
} |
|
|
|
func (p *Peering) IsActive() bool { |
|
if p == nil || p.State == PeeringState_TERMINATED { |
|
return false |
|
} |
|
if p.DeletedAt == nil { |
|
return true |
|
} |
|
|
|
// The minimum protobuf timestamp is the Unix epoch rather than go's zero. |
|
return structs.IsZeroProtoTime(p.DeletedAt) |
|
} |
|
|
|
// Validate is a validation helper that checks whether a secret ID is embedded in the container type. |
|
func (s *SecretsWriteRequest) Validate() error { |
|
if s.PeerID == "" { |
|
return errors.New("missing peer ID") |
|
} |
|
switch r := s.Request.(type) { |
|
case *SecretsWriteRequest_GenerateToken: |
|
if r != nil && r.GenerateToken.GetEstablishmentSecret() != "" { |
|
return nil |
|
} |
|
case *SecretsWriteRequest_Establish: |
|
if r != nil && r.Establish.GetActiveStreamSecret() != "" { |
|
return nil |
|
} |
|
case *SecretsWriteRequest_ExchangeSecret: |
|
if r != nil && r.ExchangeSecret.GetPendingStreamSecret() != "" { |
|
return nil |
|
} |
|
case *SecretsWriteRequest_PromotePending: |
|
if r != nil && r.PromotePending.GetActiveStreamSecret() != "" { |
|
return nil |
|
} |
|
default: |
|
return fmt.Errorf("unexpected request type %T", s.Request) |
|
} |
|
|
|
return errors.New("missing secret ID") |
|
} |
|
|
|
// TLSDialOption returns the gRPC DialOption to secure the transport if CAPems |
|
// ara available. If no CAPems were provided in the peering token then the |
|
// WithInsecure dial option is returned. |
|
func (p *Peering) TLSDialOption() (grpc.DialOption, error) { |
|
//nolint:staticcheck |
|
tlsOption := grpc.WithInsecure() |
|
|
|
if len(p.PeerCAPems) > 0 { |
|
var haveCerts bool |
|
pool := x509.NewCertPool() |
|
for _, pem := range p.PeerCAPems { |
|
if !pool.AppendCertsFromPEM([]byte(pem)) { |
|
return nil, fmt.Errorf("failed to parse PEM %s", pem) |
|
} |
|
if len(pem) > 0 { |
|
haveCerts = true |
|
} |
|
} |
|
if !haveCerts { |
|
return nil, fmt.Errorf("failed to build cert pool from peer CA pems") |
|
} |
|
cfg := tls.Config{ |
|
ServerName: p.PeerServerName, |
|
RootCAs: pool, |
|
} |
|
tlsOption = grpc.WithTransportCredentials(credentials.NewTLS(&cfg)) |
|
} |
|
return tlsOption, nil |
|
} |
|
|
|
func (p *Peering) ToAPI() *api.Peering { |
|
var t api.Peering |
|
PeeringToAPI(p, &t) |
|
return &t |
|
} |
|
|
|
// TODO consider using mog for this |
|
func (resp *PeeringListResponse) ToAPI() []*api.Peering { |
|
list := make([]*api.Peering, len(resp.Peerings)) |
|
for i, p := range resp.Peerings { |
|
list[i] = p.ToAPI() |
|
} |
|
return list |
|
} |
|
|
|
// TODO consider using mog for this |
|
func (resp *GenerateTokenResponse) ToAPI() *api.PeeringGenerateTokenResponse { |
|
var t api.PeeringGenerateTokenResponse |
|
GenerateTokenResponseToAPI(resp, &t) |
|
return &t |
|
} |
|
|
|
// TODO consider using mog for this |
|
func (resp *EstablishResponse) ToAPI() *api.PeeringEstablishResponse { |
|
var t api.PeeringEstablishResponse |
|
EstablishResponseToAPI(resp, &t) |
|
return &t |
|
} |
|
|
|
func (r *RemoteInfo) IsEmpty() bool { |
|
if r == nil { |
|
return true |
|
} |
|
return r.Partition == "" && r.Datacenter == "" && r.Locality.IsEmpty() |
|
} |
|
|
|
// convenience |
|
func NewGenerateTokenRequestFromAPI(req *api.PeeringGenerateTokenRequest) *GenerateTokenRequest { |
|
if req == nil { |
|
return nil |
|
} |
|
t := &GenerateTokenRequest{} |
|
GenerateTokenRequestFromAPI(req, t) |
|
return t |
|
} |
|
|
|
// convenience |
|
func NewEstablishRequestFromAPI(req *api.PeeringEstablishRequest) *EstablishRequest { |
|
if req == nil { |
|
return nil |
|
} |
|
t := &EstablishRequest{} |
|
EstablishRequestFromAPI(req, t) |
|
return t |
|
} |
|
|
|
func TimePtrFromProto(s *timestamppb.Timestamp) *time.Time { |
|
if s == nil { |
|
return nil |
|
} |
|
t := s.AsTime() |
|
return &t |
|
} |
|
|
|
func TimePtrToProto(s *time.Time) *timestamppb.Timestamp { |
|
if s == nil { |
|
return nil |
|
} |
|
return timestamppb.New(*s) |
|
} |
|
|
|
// DeepCopy returns a copy of the PeeringTrustBundle that can be passed around |
|
// without worrying about the receiver unsafely modifying it. It is used by the |
|
// generated DeepCopy methods in proxycfg. |
|
func (o *PeeringTrustBundle) DeepCopy() *PeeringTrustBundle { |
|
cp, ok := proto.Clone(o).(*PeeringTrustBundle) |
|
if !ok { |
|
panic(fmt.Sprintf("failed to clone *PeeringTrustBundle, got: %T", cp)) |
|
} |
|
return cp |
|
} |
|
|
|
// TODO: handle this with mog |
|
// LocalityToStructs converts a protobuf Locality to a struct Locality. |
|
func LocalityToStructs(l *pbcommon.Locality) *structs.Locality { |
|
if l == nil { |
|
return nil |
|
} |
|
return &structs.Locality{ |
|
Region: l.Region, |
|
Zone: l.Zone, |
|
} |
|
} |
|
|
|
// TODO: handle this with mog |
|
// LocalityFromStructs converts a struct Locality to a protobuf Locality. |
|
func LocalityFromStructs(l *structs.Locality) *pbcommon.Locality { |
|
if l == nil { |
|
return nil |
|
} |
|
return &pbcommon.Locality{ |
|
Region: l.Region, |
|
Zone: l.Zone, |
|
} |
|
} |
|
|
|
// TODO: handle this with mog |
|
// LocalityToAPI converts a protobuf Locality to an API Locality. |
|
func LocalityToAPI(l *pbcommon.Locality) *api.Locality { |
|
if l == nil { |
|
return nil |
|
} |
|
return &api.Locality{ |
|
Region: l.Region, |
|
Zone: l.Zone, |
|
} |
|
} |
|
|
|
// TODO: handle this with mog |
|
// LocalityFromProto converts an API Locality to a protobuf Locality. |
|
func LocalityFromAPI(l *api.Locality) *pbcommon.Locality { |
|
if l == nil { |
|
return nil |
|
} |
|
return &pbcommon.Locality{ |
|
Region: l.Region, |
|
Zone: l.Zone, |
|
} |
|
}
|
|
|