mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
287 lines
7.7 KiB
287 lines
7.7 KiB
package pbpeering |
|
|
|
import ( |
|
"strconv" |
|
"time" |
|
|
|
"github.com/golang/protobuf/ptypes/timestamp" |
|
"github.com/mitchellh/hashstructure" |
|
|
|
"github.com/hashicorp/consul/agent/cache" |
|
"github.com/hashicorp/consul/agent/structs" |
|
"github.com/hashicorp/consul/api" |
|
"github.com/hashicorp/consul/lib" |
|
) |
|
|
|
// TODO(peering): These are byproducts of not embedding |
|
// types in our protobuf definitions and are temporary; |
|
// Hoping to replace them with 1 or 2 methods per request |
|
// using https://github.com/hashicorp/consul/pull/12507 |
|
|
|
// RequestDatacenter implements structs.RPCInfo |
|
func (req *GenerateTokenRequest) RequestDatacenter() string { |
|
return req.Datacenter |
|
} |
|
|
|
// IsRead implements structs.RPCInfo |
|
func (req *GenerateTokenRequest) IsRead() bool { |
|
return false |
|
} |
|
|
|
// AllowStaleRead implements structs.RPCInfo |
|
func (req *GenerateTokenRequest) AllowStaleRead() bool { |
|
return false |
|
} |
|
|
|
// TokenSecret implements structs.RPCInfo |
|
func (req *GenerateTokenRequest) TokenSecret() string { |
|
return req.Token |
|
} |
|
|
|
// SetTokenSecret implements structs.RPCInfo |
|
func (req *GenerateTokenRequest) SetTokenSecret(token string) { |
|
req.Token = token |
|
} |
|
|
|
// HasTimedOut implements structs.RPCInfo |
|
func (req *GenerateTokenRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) { |
|
return time.Since(start) > rpcHoldTimeout, nil |
|
} |
|
|
|
// Timeout implements structs.RPCInfo |
|
func (msg *GenerateTokenRequest) Timeout(rpcHoldTimeout time.Duration, maxQueryTime time.Duration, defaultQueryTime time.Duration) time.Duration { |
|
return rpcHoldTimeout |
|
} |
|
|
|
// IsRead implements structs.RPCInfo |
|
func (req *EstablishRequest) IsRead() bool { |
|
return false |
|
} |
|
|
|
// AllowStaleRead implements structs.RPCInfo |
|
func (req *EstablishRequest) AllowStaleRead() bool { |
|
return false |
|
} |
|
|
|
// TokenSecret implements structs.RPCInfo |
|
func (req *EstablishRequest) TokenSecret() string { |
|
return req.Token |
|
} |
|
|
|
// SetTokenSecret implements structs.RPCInfo |
|
func (req *EstablishRequest) SetTokenSecret(token string) { |
|
req.Token = token |
|
} |
|
|
|
// HasTimedOut implements structs.RPCInfo |
|
func (req *EstablishRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) { |
|
return time.Since(start) > rpcHoldTimeout, nil |
|
} |
|
|
|
// Timeout implements structs.RPCInfo |
|
func (msg *EstablishRequest) Timeout(rpcHoldTimeout time.Duration, maxQueryTime time.Duration, defaultQueryTime time.Duration) time.Duration { |
|
return rpcHoldTimeout |
|
} |
|
|
|
// ShouldDial returns true when the peering was stored via the peering initiation endpoint, |
|
// AND the peering is not marked as terminated by our peer. |
|
// If we generated a token for this peer we did not store our server addresses under PeerServerAddresses. |
|
// These server addresses are for dialing, and only the peer initiating the peering will do the dialing. |
|
func (p *Peering) ShouldDial() bool { |
|
return len(p.PeerServerAddresses) > 0 |
|
} |
|
|
|
func (x PeeringState) GoString() string { |
|
return x.String() |
|
} |
|
|
|
func (r *TrustBundleReadRequest) CacheInfo() cache.RequestInfo { |
|
info := cache.RequestInfo{ |
|
// TODO(peering): Revisit whether this is the token to use once request types accept a token. |
|
Token: r.Token(), |
|
Datacenter: r.Datacenter, |
|
MinIndex: 0, |
|
Timeout: 0, |
|
MustRevalidate: false, |
|
|
|
// TODO(peering): Cache.notifyPollingQuery polls at this interval. We need to revisit how that polling works. |
|
// Using an exponential backoff when the result hasn't changed may be preferable. |
|
MaxAge: 1 * time.Second, |
|
} |
|
|
|
v, err := hashstructure.Hash([]interface{}{ |
|
r.Partition, |
|
r.Name, |
|
}, nil) |
|
if err == nil { |
|
// If there is an error, we don't set the key. A blank key forces |
|
// no cache for this request so the request is forwarded directly |
|
// to the server. |
|
info.Key = strconv.FormatUint(v, 10) |
|
} |
|
|
|
return info |
|
} |
|
|
|
// ConcatenatedRootPEMs concatenates and returns all PEM-encoded public certificates |
|
// in a peer's trust bundle. |
|
func (b *PeeringTrustBundle) ConcatenatedRootPEMs() string { |
|
if b == nil { |
|
return "" |
|
} |
|
|
|
var rootPEMs string |
|
for _, pem := range b.RootPEMs { |
|
rootPEMs += lib.EnsureTrailingNewline(pem) |
|
} |
|
return rootPEMs |
|
} |
|
|
|
// enumcover:PeeringState |
|
func PeeringStateToAPI(s PeeringState) api.PeeringState { |
|
switch s { |
|
case PeeringState_PENDING: |
|
return api.PeeringStatePending |
|
case PeeringState_ESTABLISHING: |
|
return api.PeeringStateEstablishing |
|
case PeeringState_ACTIVE: |
|
return api.PeeringStateActive |
|
case PeeringState_FAILING: |
|
return api.PeeringStateFailing |
|
case PeeringState_DELETING: |
|
return api.PeeringStateDeleting |
|
case PeeringState_TERMINATED: |
|
return api.PeeringStateTerminated |
|
case PeeringState_UNDEFINED: |
|
fallthrough |
|
default: |
|
return api.PeeringStateUndefined |
|
} |
|
} |
|
|
|
// enumcover:api.PeeringState |
|
func PeeringStateFromAPI(t api.PeeringState) PeeringState { |
|
switch t { |
|
case api.PeeringStatePending: |
|
return PeeringState_PENDING |
|
case api.PeeringStateEstablishing: |
|
return PeeringState_ESTABLISHING |
|
case api.PeeringStateActive: |
|
return PeeringState_ACTIVE |
|
case api.PeeringStateFailing: |
|
return PeeringState_FAILING |
|
case api.PeeringStateDeleting: |
|
return PeeringState_DELETING |
|
case api.PeeringStateTerminated: |
|
return PeeringState_TERMINATED |
|
case api.PeeringStateUndefined: |
|
fallthrough |
|
default: |
|
return PeeringState_UNDEFINED |
|
} |
|
} |
|
|
|
func (p *Peering) IsActive() bool { |
|
if p != nil && p.State == PeeringState_TERMINATED { |
|
return false |
|
} |
|
if p == nil || p.DeletedAt == nil { |
|
return true |
|
} |
|
|
|
// The minimum protobuf timestamp is the Unix epoch rather than go's zero. |
|
return structs.IsZeroProtoTime(p.DeletedAt) |
|
} |
|
|
|
func (p *Peering) ToAPI() *api.Peering { |
|
var t api.Peering |
|
PeeringToAPI(p, &t) |
|
return &t |
|
} |
|
|
|
// TODO consider using mog for this |
|
func (resp *PeeringListResponse) ToAPI() []*api.Peering { |
|
list := make([]*api.Peering, len(resp.Peerings)) |
|
for i, p := range resp.Peerings { |
|
list[i] = p.ToAPI() |
|
} |
|
return list |
|
} |
|
|
|
// TODO consider using mog for this |
|
func (resp *GenerateTokenResponse) ToAPI() *api.PeeringGenerateTokenResponse { |
|
var t api.PeeringGenerateTokenResponse |
|
GenerateTokenResponseToAPI(resp, &t) |
|
return &t |
|
} |
|
|
|
// TODO consider using mog for this |
|
func (resp *EstablishResponse) ToAPI() *api.PeeringEstablishResponse { |
|
var t api.PeeringEstablishResponse |
|
EstablishResponseToAPI(resp, &t) |
|
return &t |
|
} |
|
|
|
// convenience |
|
func NewGenerateTokenRequestFromAPI(req *api.PeeringGenerateTokenRequest) *GenerateTokenRequest { |
|
if req == nil { |
|
return nil |
|
} |
|
t := &GenerateTokenRequest{} |
|
GenerateTokenRequestFromAPI(req, t) |
|
return t |
|
} |
|
|
|
// convenience |
|
func NewEstablishRequestFromAPI(req *api.PeeringEstablishRequest) *EstablishRequest { |
|
if req == nil { |
|
return nil |
|
} |
|
t := &EstablishRequest{} |
|
EstablishRequestFromAPI(req, t) |
|
return t |
|
} |
|
|
|
func (r *TrustBundleListByServiceRequest) CacheInfo() cache.RequestInfo { |
|
info := cache.RequestInfo{ |
|
// TODO(peering): Revisit whether this is the token to use once request types accept a token. |
|
Token: r.Token(), |
|
Datacenter: r.Datacenter, |
|
MinIndex: 0, |
|
Timeout: 0, |
|
MustRevalidate: false, |
|
|
|
// TODO(peering): Cache.notifyPollingQuery polls at this interval. We need to revisit how that polling works. |
|
// Using an exponential backoff when the result hasn't changed may be preferable. |
|
MaxAge: 1 * time.Second, |
|
} |
|
|
|
v, err := hashstructure.Hash([]interface{}{ |
|
r.Partition, |
|
r.Namespace, |
|
r.ServiceName, |
|
}, nil) |
|
if err == nil { |
|
// If there is an error, we don't set the key. A blank key forces |
|
// no cache for this request so the request is forwarded directly |
|
// to the server. |
|
info.Key = strconv.FormatUint(v, 10) |
|
} |
|
|
|
return info |
|
} |
|
|
|
func TimePtrFromProto(s *timestamp.Timestamp) *time.Time { |
|
if s == nil { |
|
return nil |
|
} |
|
t := structs.TimeFromProto(s) |
|
return &t |
|
} |
|
|
|
func TimePtrToProto(s *time.Time) *timestamp.Timestamp { |
|
if s == nil { |
|
return nil |
|
} |
|
return structs.TimeToProto(*s) |
|
}
|
|
|