package proxycfg
import (
"context"
"errors"
"fmt"
"log"
"reflect"
"strings"
"time"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
"github.com/mitchellh/copystructure"
)
const (
coalesceTimeout = 200 * time . Millisecond
rootsWatchID = "roots"
leafWatchID = "leaf"
intentionsWatchID = "intentions"
serviceIDPrefix = string ( structs . UpstreamDestTypeService ) + ":"
preparedQueryIDPrefix = string ( structs . UpstreamDestTypePreparedQuery ) + ":"
defaultPreparedQueryPollInterval = 30 * time . Second
)
// state holds all the state needed to maintain the config for a registered
// connect-proxy service. When a proxy registration is changed, the entire state
// is discarded and a new one created.
type state struct {
// logger, source and cache are required to be set before calling Watch.
logger * log . Logger
source * structs . QuerySource
cache * cache . Cache
// ctx and cancel store the context created during initWatches call
ctx context . Context
cancel func ( )
proxyID string
address string
port int
proxyCfg structs . ConnectProxyConfig
token string
ch chan cache . UpdateEvent
snapCh chan ConfigSnapshot
reqCh chan chan * ConfigSnapshot
}
// newState populates the state struct by copying relevant fields from the
// NodeService and Token. We copy so that we can use them in a separate
// goroutine later without reasoning about races with the NodeService passed
// (especially for embedded fields like maps and slices).
//
// The returned state needs it's required dependencies to be set before Watch
// can be called.
func newState ( ns * structs . NodeService , token string ) ( * state , error ) {
if ns . Kind != structs . ServiceKindConnectProxy {
return nil , errors . New ( "not a connect-proxy" )
}
// Copy the config map
proxyCfgRaw , err := copystructure . Copy ( ns . Proxy )
if err != nil {
return nil , err
}
proxyCfg , ok := proxyCfgRaw . ( structs . ConnectProxyConfig )
if ! ok {
return nil , errors . New ( "failed to copy proxy config" )
}
return & state {
proxyID : ns . ID ,
address : ns . Address ,
port : ns . Port ,
proxyCfg : proxyCfg ,
token : token ,
// 10 is fairly arbitrary here but allow for the 3 mandatory and a
// reasonable number of upstream watches to all deliver their initial
// messages in parallel without blocking the cache.Notify loops. It's not a
// huge deal if we do for a short period so we don't need to be more
// conservative to handle larger numbers of upstreams correctly but gives
// some head room for normal operation to be non-blocking in most typical
// cases.
ch : make ( chan cache . UpdateEvent , 10 ) ,
snapCh : make ( chan ConfigSnapshot , 1 ) ,
reqCh : make ( chan chan * ConfigSnapshot , 1 ) ,
} , nil
}
// Watch initialised watches on all necessary cache data for the current proxy
// registration state and returns a chan to observe updates to the
// ConfigSnapshot that contains all necessary config state. The chan is closed
// when the state is Closed.
func ( s * state ) Watch ( ) ( <- chan ConfigSnapshot , error ) {
s . ctx , s . cancel = context . WithCancel ( context . Background ( ) )
err := s . initWatches ( )
if err != nil {
s . cancel ( )
return nil , err
}
go s . run ( )
return s . snapCh , nil
}
// Close discards the state and stops any long-running watches.
func ( s * state ) Close ( ) error {
if s . cancel != nil {
s . cancel ( )
}
return nil
}
// initWatches sets up the watches needed based on current proxy registration
// state.
func ( s * state ) initWatches ( ) error {
// Watch for root changes
err := s . cache . Notify ( s . ctx , cachetype . ConnectCARootName , & structs . DCSpecificRequest {
Datacenter : s . source . Datacenter ,
QueryOptions : structs . QueryOptions { Token : s . token } ,
} , rootsWatchID , s . ch )
if err != nil {
return err
}
// Watch the leaf cert
err = s . cache . Notify ( s . ctx , cachetype . ConnectCALeafName , & cachetype . ConnectCALeafRequest {
Datacenter : s . source . Datacenter ,
Token : s . token ,
Service : s . proxyCfg . DestinationServiceName ,
} , leafWatchID , s . ch )
if err != nil {
return err
}
// Watch for intention updates
err = s . cache . Notify ( s . ctx , cachetype . IntentionMatchName , & structs . IntentionQueryRequest {
Datacenter : s . source . Datacenter ,
QueryOptions : structs . QueryOptions { Token : s . token } ,
Match : & structs . IntentionQueryMatch {
Type : structs . IntentionMatchDestination ,
Entries : [ ] structs . IntentionMatchEntry {
{
Namespace : structs . IntentionDefaultNamespace ,
Name : s . proxyCfg . DestinationServiceName ,
} ,
} ,
} ,
} , intentionsWatchID , s . ch )
if err != nil {
return err
}
// Watch for updates to service endpoints for all upstreams
for _ , u := range s . proxyCfg . Upstreams {
dc := s . source . Datacenter
if u . Datacenter != "" {
dc = u . Datacenter
}
switch u . DestinationType {
case structs . UpstreamDestTypePreparedQuery :
err = s . cache . Notify ( s . ctx , cachetype . PreparedQueryName , & structs . PreparedQueryExecuteRequest {
Datacenter : dc ,
QueryOptions : structs . QueryOptions { Token : s . token , MaxAge : defaultPreparedQueryPollInterval } ,
QueryIDOrName : u . DestinationName ,
Connect : true ,
} , u . Identifier ( ) , s . ch )
case structs . UpstreamDestTypeService :
fallthrough
case "" : // Treat unset as the default Service type
err = s . cache . Notify ( s . ctx , cachetype . HealthServicesName , & structs . ServiceSpecificRequest {
Datacenter : dc ,
QueryOptions : structs . QueryOptions { Token : s . token } ,
ServiceName : u . DestinationName ,
Connect : true ,
} , u . Identifier ( ) , s . ch )
if err != nil {
return err
}
default :
return fmt . Errorf ( "unknown upstream type: %q" , u . DestinationType )
}
}
return nil
}
func ( s * state ) run ( ) {
// Close the channel we return from Watch when we stop so consumers can stop
// watching and clean up their goroutines. It's important we do this here and
// not in Close since this routine sends on this chan and so might panic if it
// gets closed from another goroutine.
defer close ( s . snapCh )
snap := ConfigSnapshot {
ProxyID : s . proxyID ,
Address : s . address ,
Port : s . port ,
Proxy : s . proxyCfg ,
UpstreamEndpoints : make ( map [ string ] structs . CheckServiceNodes ) ,
}
// This turns out to be really fiddly/painful by just using time.Timer.C
// directly in the code below since you can't detect when a timer is stopped
// vs waiting in order to know to reset it. So just use a chan to send
// ourselves messages.
sendCh := make ( chan struct { } )
var coalesceTimer * time . Timer
for {
select {
case <- s . ctx . Done ( ) :
return
case u := <- s . ch :
if err := s . handleUpdate ( u , & snap ) ; err != nil {
s . logger . Printf ( "[ERR] %s watch error: %s" , u . CorrelationID , err )
continue
}
case <- sendCh :
// Make a deep copy of snap so we don't mutate any of the embedded structs
// etc on future updates.
snapCopy , err := snap . Clone ( )
if err != nil {
s . logger . Printf ( "[ERR] Failed to copy config snapshot for proxy %s" ,
s . proxyID )
continue
}
s . snapCh <- * snapCopy
// Allow the next change to trigger a send
coalesceTimer = nil
// Skip rest of loop - there is nothing to send since nothing changed on
// this iteration
continue
case replyCh := <- s . reqCh :
if ! snap . Valid ( ) {
// Not valid yet just respond with nil and move on to next task.
replyCh <- nil
continue
}
// Make a deep copy of snap so we don't mutate any of the embedded structs
// etc on future updates.
snapCopy , err := snap . Clone ( )
if err != nil {
s . logger . Printf ( "[ERR] Failed to copy config snapshot for proxy %s" ,
s . proxyID )
continue
}
replyCh <- snapCopy
// Skip rest of loop - there is nothing to send since nothing changed on
// this iteration
continue
}
// Check if snap is complete enough to be a valid config to deliver to a
// proxy yet.
if snap . Valid ( ) {
// Don't send it right away, set a short timer that will wait for updates
// from any of the other cache values and deliver them all together.
if coalesceTimer == nil {
coalesceTimer = time . AfterFunc ( coalesceTimeout , func ( ) {
// This runs in another goroutine so we can't just do the send
// directly here as access to snap is racy. Instead, signal the main
// loop above.
sendCh <- struct { } { }
} )
}
}
}
}
func ( s * state ) handleUpdate ( u cache . UpdateEvent , snap * ConfigSnapshot ) error {
switch u . CorrelationID {
case rootsWatchID :
roots , ok := u . Result . ( * structs . IndexedCARoots )
if ! ok {
return fmt . Errorf ( "invalid type for roots response: %T" , u . Result )
}
snap . Roots = roots
case leafWatchID :
leaf , ok := u . Result . ( * structs . IssuedCert )
if ! ok {
return fmt . Errorf ( "invalid type for leaf response: %T" , u . Result )
}
snap . Leaf = leaf
case intentionsWatchID :
// Not in snapshot currently, no op
default :
// Service discovery result, figure out which type
switch {
case strings . HasPrefix ( u . CorrelationID , serviceIDPrefix ) :
resp , ok := u . Result . ( * structs . IndexedCheckServiceNodes )
if ! ok {
return fmt . Errorf ( "invalid type for service response: %T" , u . Result )
}
snap . UpstreamEndpoints [ u . CorrelationID ] = resp . Nodes
case strings . HasPrefix ( u . CorrelationID , preparedQueryIDPrefix ) :
resp , ok := u . Result . ( * structs . PreparedQueryExecuteResponse )
if ! ok {
return fmt . Errorf ( "invalid type for prepared query response: %T" , u . Result )
}
snap . UpstreamEndpoints [ u . CorrelationID ] = resp . Nodes
default :
return errors . New ( "unknown correlation ID" )
}
}
return nil
}
// CurrentSnapshot synchronously returns the current ConfigSnapshot if there is
// one ready. If we don't have one yet because not all necessary parts have been
// returned (i.e. both roots and leaf cert), nil is returned.
func ( s * state ) CurrentSnapshot ( ) * ConfigSnapshot {
// Make a chan for the response to be sent on
ch := make ( chan * ConfigSnapshot , 1 )
s . reqCh <- ch
// Wait for the response
return <- ch
}
// Changed returns whether or not the passed NodeService has had any of the
// fields we care about for config state watching changed or a different token.
func ( s * state ) Changed ( ns * structs . NodeService , token string ) bool {
if ns == nil {
return true
}
return ns . Kind != structs . ServiceKindConnectProxy ||
s . proxyID != ns . ID ||
s . address != ns . Address ||
s . port != ns . Port ||
! reflect . DeepEqual ( s . proxyCfg , ns . Proxy ) ||
s . token != token
}