Expose HTTP-based paths through Connect proxy (#6446)

Fixes: #5396

This PR adds a proxy configuration stanza called expose. These flags register
listeners in Connect sidecar proxies to allow requests to specific HTTP paths from outside of the node. This allows services to protect themselves by only
listening on the loopback interface, while still accepting traffic from non
Connect-enabled services.

Under expose there is a boolean checks flag that would automatically expose all
registered HTTP and gRPC check paths.

This stanza also accepts a paths list to expose individual paths. The primary
use case for this functionality would be to expose paths for third parties like
Prometheus or the kubelet.

Listeners for requests to exposed paths are be configured dynamically at run
time. Any time a proxy, or check can be registered, a listener can also be
created.

In this initial implementation requests to these paths are not
authenticated/encrypted.
pull/6557/head
Freddy 2019-09-25 20:55:52 -06:00 committed by GitHub
parent f6b928043f
commit fdd10dd8b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 3278 additions and 213 deletions

View File

@ -6,6 +6,7 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"github.com/hashicorp/go-memdb"
"io"
"io/ioutil"
"log"
@ -13,6 +14,7 @@ import (
"net/http"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
@ -20,7 +22,7 @@ import (
"google.golang.org/grpc"
metrics "github.com/armon/go-metrics"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/ae"
"github.com/hashicorp/consul/agent/cache"
@ -42,8 +44,8 @@ import (
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
multierror "github.com/hashicorp/go-multierror"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
@ -77,6 +79,18 @@ const (
// ID of the leaf watch
leafWatchID = "leaf"
// maxQueryTime is used to bound the limit of a blocking query
maxQueryTime = 600 * time.Second
// defaultQueryTime is the amount of time we block waiting for a change
// if no time is specified. Previously we would wait the maxQueryTime.
defaultQueryTime = 300 * time.Second
)
var (
httpAddrRE = regexp.MustCompile(`^(http[s]?://)(\[.*?\]|\[?[\w\-\.]+)(:\d+)?([^?]*)(\?.*)?$`)
grpcAddrRE = regexp.MustCompile("(.*)((?::)(?:[0-9]+))(.*)$")
)
type configSource int
@ -206,6 +220,9 @@ type Agent struct {
// checkAliases maps the check ID to an associated Alias checks
checkAliases map[types.CheckID]*checks.CheckAlias
// exposedPorts tracks listener ports for checks exposed through a proxy
exposedPorts map[string]int
// stateLock protects the agent state
stateLock sync.Mutex
@ -691,6 +708,8 @@ func (a *Agent) listenAndServeGRPC() error {
CfgMgr: a.proxyConfig,
Authz: a,
ResolveToken: a.resolveToken,
CheckFetcher: a,
CfgFetcher: a,
}
a.xdsServer.Initialize()
@ -1836,7 +1855,7 @@ func (a *Agent) ResumeSync() {
// syncPausedCh returns either a channel or nil. If nil sync is not paused. If
// non-nil, the channel will be closed when sync resumes.
func (a *Agent) syncPausedCh() <-chan struct{} {
func (a *Agent) SyncPausedCh() <-chan struct{} {
a.syncMu.Lock()
defer a.syncMu.Unlock()
return a.syncCh
@ -2265,7 +2284,7 @@ func (a *Agent) addServiceInternal(req *addServiceRequest) error {
}
// cleanup, store the ids of services and checks that weren't previously
// registered so we clean them up if somthing fails halfway through the
// registered so we clean them up if something fails halfway through the
// process.
var cleanupServices []string
var cleanupChecks []types.CheckID
@ -2301,6 +2320,19 @@ func (a *Agent) addServiceInternal(req *addServiceRequest) error {
}
}
// If a proxy service wishes to expose checks, check targets need to be rerouted to the proxy listener
// This needs to be called after chkTypes are added to the agent, to avoid being overwritten
if service.Proxy.Expose.Checks {
err := a.rerouteExposedChecks(service.Proxy.DestinationServiceID, service.Proxy.LocalServiceAddress)
if err != nil {
a.logger.Println("[WARN] failed to reroute L7 checks to exposed proxy listener")
}
} else {
// Reset check targets if proxy was re-registered but no longer wants to expose checks
// If the proxy is being registered for the first time then this is a no-op
a.resetExposedChecks(service.Proxy.DestinationServiceID)
}
if persistServiceConfig && a.config.DataDir != "" {
var err error
if persistDefaults != nil {
@ -2437,6 +2469,14 @@ func (a *Agent) removeServiceLocked(serviceID string, persist bool) error {
a.serviceManager.RemoveService(serviceID)
}
// Reset the HTTP check targets if they were exposed through a proxy
// If this is not a proxy or checks were not exposed then this is a no-op
svc := a.State.Service(serviceID)
if svc != nil {
a.resetExposedChecks(svc.Proxy.DestinationServiceID)
}
checks := a.State.Checks()
var checkIDs []types.CheckID
for id, check := range checks {
@ -2573,6 +2613,19 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
if chkType.OutputMaxSize > 0 && maxOutputSize > chkType.OutputMaxSize {
maxOutputSize = chkType.OutputMaxSize
}
// Get the address of the proxy for this service if it exists
// Need its config to know whether we should reroute checks to it
var proxy *structs.NodeService
if service != nil {
for _, svc := range a.State.Services() {
if svc.Proxy.DestinationServiceID == service.ID {
proxy = svc
break
}
}
}
switch {
case chkType.IsTTL():
@ -2584,6 +2637,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
ttl := &checks.CheckTTL{
Notify: a.State,
CheckID: check.CheckID,
ServiceID: check.ServiceID,
TTL: chkType.TTL,
Logger: a.logger,
OutputMaxSize: maxOutputSize,
@ -2614,6 +2668,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
http := &checks.CheckHTTP{
Notify: a.State,
CheckID: check.CheckID,
ServiceID: check.ServiceID,
HTTP: chkType.HTTP,
Header: chkType.Header,
Method: chkType.Method,
@ -2623,6 +2678,16 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
OutputMaxSize: maxOutputSize,
TLSClientConfig: tlsClientConfig,
}
if proxy != nil && proxy.Proxy.Expose.Checks {
port, err := a.listenerPortLocked(service.ID, string(http.CheckID))
if err != nil {
a.logger.Printf("[ERR] agent: error exposing check: %s", err)
return err
}
http.ProxyHTTP = httpInjectAddr(http.HTTP, proxy.Proxy.LocalServiceAddress, port)
}
http.Start()
a.checkHTTPs[check.CheckID] = http
@ -2638,12 +2703,13 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
}
tcp := &checks.CheckTCP{
Notify: a.State,
CheckID: check.CheckID,
TCP: chkType.TCP,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
Notify: a.State,
CheckID: check.CheckID,
ServiceID: check.ServiceID,
TCP: chkType.TCP,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
}
tcp.Start()
a.checkTCPs[check.CheckID] = tcp
@ -2667,12 +2733,23 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
grpc := &checks.CheckGRPC{
Notify: a.State,
CheckID: check.CheckID,
ServiceID: check.ServiceID,
GRPC: chkType.GRPC,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
Logger: a.logger,
TLSClientConfig: tlsClientConfig,
}
if proxy != nil && proxy.Proxy.Expose.Checks {
port, err := a.listenerPortLocked(service.ID, string(grpc.CheckID))
if err != nil {
a.logger.Printf("[ERR] agent: error exposing check: %s", err)
return err
}
grpc.ProxyGRPC = grpcInjectAddr(grpc.GRPC, proxy.Proxy.LocalServiceAddress, port)
}
grpc.Start()
a.checkGRPCs[check.CheckID] = grpc
@ -2700,6 +2777,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
dockerCheck := &checks.CheckDocker{
Notify: a.State,
CheckID: check.CheckID,
ServiceID: check.ServiceID,
DockerContainerID: chkType.DockerContainerID,
Shell: chkType.Shell,
ScriptArgs: chkType.ScriptArgs,
@ -2726,6 +2804,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
monitor := &checks.CheckMonitor{
Notify: a.State,
CheckID: check.CheckID,
ServiceID: check.ServiceID,
ScriptArgs: chkType.ScriptArgs,
Interval: chkType.Interval,
Timeout: chkType.Timeout,
@ -2768,6 +2847,16 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
return fmt.Errorf("Check type is not valid")
}
// Notify channel that watches for service state changes
// This is a non-blocking send to avoid synchronizing on a large number of check updates
s := a.State.ServiceState(check.ServiceID)
if s != nil && !s.Deleted {
select {
case s.WatchCh <- struct{}{}:
default:
}
}
if chkType.DeregisterCriticalServiceAfter > 0 {
timeout := chkType.DeregisterCriticalServiceAfter
if timeout < a.config.CheckDeregisterIntervalMin {
@ -2800,6 +2889,28 @@ func (a *Agent) removeCheckLocked(checkID types.CheckID, persist bool) error {
return fmt.Errorf("CheckID missing")
}
// Notify channel that watches for service state changes
// This is a non-blocking send to avoid synchronizing on a large number of check updates
var svcID string
for _, c := range a.State.Checks() {
if c.CheckID == checkID {
svcID = c.ServiceID
break
}
}
s := a.State.ServiceState(svcID)
if s != nil && !s.Deleted {
select {
case s.WatchCh <- struct{}{}:
default:
}
}
// Delete port from allocated port set
// If checks weren't being exposed then this is a no-op
portKey := listenerPortKey(svcID, string(checkID))
delete(a.exposedPorts, portKey)
a.cancelCheckMonitors(checkID)
a.State.RemoveCheck(checkID)
@ -2811,10 +2922,33 @@ func (a *Agent) removeCheckLocked(checkID types.CheckID, persist bool) error {
return err
}
}
a.logger.Printf("[DEBUG] agent: removed check %q", checkID)
return nil
}
func (a *Agent) ServiceHTTPBasedChecks(serviceID string) []structs.CheckType {
a.stateLock.Lock()
defer a.stateLock.Unlock()
var chkTypes = make([]structs.CheckType, 0)
for _, c := range a.checkHTTPs {
if c.ServiceID == serviceID {
chkTypes = append(chkTypes, c.CheckType())
}
}
for _, c := range a.checkGRPCs {
if c.ServiceID == serviceID {
chkTypes = append(chkTypes, c.CheckType())
}
}
return chkTypes
}
func (a *Agent) AdvertiseAddrLAN() string {
return a.config.AdvertiseAddrLAN.String()
}
// resolveProxyCheckAddress returns the best address to use for a TCP check of
// the proxy's public listener. It expects the input to already have default
// values populated by applyProxyConfigDefaults. It may return an empty string
@ -3623,6 +3757,65 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
return nil
}
// LocalBlockingQuery performs a blocking query in a generic way against
// local agent state that has no RPC or raft to back it. It uses `hash` parameter
// instead of an `index`.
// `alwaysBlock` determines whether we block if the provided hash is empty.
// Callers like the AgentService endpoint will want to return the current result if a hash isn't provided.
// On the other hand, for cache notifications we always want to block. This avoids an empty first response.
func (a *Agent) LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Duration,
fn func(ws memdb.WatchSet) (string, interface{}, error)) (string, interface{}, error) {
// If we are not blocking we can skip tracking and allocating - nil WatchSet
// is still valid to call Add on and will just be a no op.
var ws memdb.WatchSet
var timeout *time.Timer
if alwaysBlock || hash != "" {
if wait == 0 {
wait = defaultQueryTime
}
if wait > 10*time.Minute {
wait = maxQueryTime
}
// Apply a small amount of jitter to the request.
wait += lib.RandomStagger(wait / 16)
timeout = time.NewTimer(wait)
}
for {
// Must reset this every loop in case the Watch set is already closed but
// hash remains same. In that case we'll need to re-block on ws.Watch()
// again.
ws = memdb.NewWatchSet()
curHash, curResp, err := fn(ws)
if err != nil {
return "", curResp, err
}
// Return immediately if there is no timeout, the hash is different or the
// Watch returns true (indicating timeout fired). Note that Watch on a nil
// WatchSet immediately returns false which would incorrectly cause this to
// loop and repeat again, however we rely on the invariant that ws == nil
// IFF timeout == nil in which case the Watch call is never invoked.
if timeout == nil || hash != curHash || ws.Watch(timeout.C) {
return curHash, curResp, err
}
// Watch returned false indicating a change was detected, loop and repeat
// the callback to load the new value. If agent sync is paused it means
// local state is currently being bulk-edited e.g. config reload. In this
// case it's likely that local state just got unloaded and may or may not be
// reloaded yet. Wait a short amount of time for Sync to resume to ride out
// typical config reloads.
if syncPauseCh := a.SyncPausedCh(); syncPauseCh != nil {
select {
case <-syncPauseCh:
case <-timeout.C:
}
}
}
}
// registerCache configures the cache and registers all the supported
// types onto the cache. This is NOT safe to call multiple times so
// care should be taken to call this exactly once after the cache
@ -3744,6 +3937,139 @@ func (a *Agent) registerCache() {
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecks{
Agent: a,
}, &cache.RegisterOptions{
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
}
func (a *Agent) LocalState() *local.State {
return a.State
}
// rerouteExposedChecks will inject proxy address into check targets
// Future calls to check() will dial the proxy listener
// The agent stateLock MUST be held for this to be called
func (a *Agent) rerouteExposedChecks(serviceID string, proxyAddr string) error {
for _, c := range a.checkHTTPs {
if c.ServiceID != serviceID {
continue
}
port, err := a.listenerPortLocked(serviceID, string(c.CheckID))
if err != nil {
return err
}
c.ProxyHTTP = httpInjectAddr(c.HTTP, proxyAddr, port)
}
for _, c := range a.checkGRPCs {
if c.ServiceID != serviceID {
continue
}
port, err := a.listenerPortLocked(serviceID, string(c.CheckID))
if err != nil {
return err
}
c.ProxyGRPC = grpcInjectAddr(c.GRPC, proxyAddr, port)
}
return nil
}
// resetExposedChecks will set Proxy addr in HTTP checks to empty string
// Future calls to check() will use the original target c.HTTP or c.GRPC
// The agent stateLock MUST be held for this to be called
func (a *Agent) resetExposedChecks(serviceID string) {
ids := make([]string, 0)
for _, c := range a.checkHTTPs {
if c.ServiceID == serviceID {
c.ProxyHTTP = ""
ids = append(ids, string(c.CheckID))
}
}
for _, c := range a.checkGRPCs {
if c.ServiceID == serviceID {
c.ProxyGRPC = ""
ids = append(ids, string(c.CheckID))
}
}
for _, checkID := range ids {
delete(a.exposedPorts, listenerPortKey(serviceID, checkID))
}
}
// listenerPort allocates a port from the configured range
// The agent stateLock MUST be held when this is called
func (a *Agent) listenerPortLocked(svcID, checkID string) (int, error) {
key := listenerPortKey(svcID, checkID)
if a.exposedPorts == nil {
a.exposedPorts = make(map[string]int)
}
if p, ok := a.exposedPorts[key]; ok {
return p, nil
}
allocated := make(map[int]bool)
for _, v := range a.exposedPorts {
allocated[v] = true
}
var port int
for i := 0; i < a.config.ExposeMaxPort-a.config.ExposeMinPort; i++ {
port = a.config.ExposeMinPort + i
if !allocated[port] {
a.exposedPorts[key] = port
break
}
}
if port == 0 {
return 0, fmt.Errorf("no ports available to expose '%s'", checkID)
}
return port, nil
}
func listenerPortKey(svcID, checkID string) string {
return fmt.Sprintf("%s:%s", svcID, checkID)
}
// grpcInjectAddr injects an ip and port into an address of the form: ip:port[/service]
func grpcInjectAddr(existing string, ip string, port int) string {
portRepl := fmt.Sprintf("${1}:%d${3}", port)
out := grpcAddrRE.ReplaceAllString(existing, portRepl)
addrRepl := fmt.Sprintf("%s${2}${3}", ip)
out = grpcAddrRE.ReplaceAllString(out, addrRepl)
return out
}
// httpInjectAddr injects a port then an IP into a URL
func httpInjectAddr(url string, ip string, port int) string {
portRepl := fmt.Sprintf("${1}${2}:%d${4}${5}", port)
out := httpAddrRE.ReplaceAllString(url, portRepl)
// Ensure that ipv6 addr is enclosed in brackets (RFC 3986)
ip = fixIPv6(ip)
addrRepl := fmt.Sprintf("${1}%s${3}${4}${5}", ip)
out = httpAddrRE.ReplaceAllString(out, addrRepl)
return out
}
func fixIPv6(address string) string {
if strings.Count(address, ":") < 2 {
return address
}
if !strings.HasSuffix(address, "]") {
address = address + "]"
}
if !strings.HasPrefix(address, "[") {
address = "[" + address
}
return address
}
// defaultIfEmpty returns the value if not empty otherwise the default value.

View File

@ -3,15 +3,13 @@ package agent
import (
"encoding/json"
"fmt"
"github.com/hashicorp/go-memdb"
"github.com/mitchellh/hashstructure"
"log"
"net/http"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/hashicorp/go-memdb"
"github.com/mitchellh/hashstructure"
"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
@ -24,7 +22,7 @@ import (
"github.com/hashicorp/consul/lib/file"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types"
bexpr "github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/logutils"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
@ -262,7 +260,7 @@ func (s *HTTPServer) AgentService(resp http.ResponseWriter, req *http.Request) (
// in QueryOptions but I didn't want to make very general changes right away.
hash := req.URL.Query().Get("hash")
return s.agentLocalBlockingQuery(resp, hash, &queryOpts,
resultHash, service, err := s.agent.LocalBlockingQuery(false, hash, queryOpts.MaxQueryTime,
func(ws memdb.WatchSet) (string, interface{}, error) {
svcState := s.agent.State.ServiceState(id)
@ -299,7 +297,12 @@ func (s *HTTPServer) AgentService(resp http.ResponseWriter, req *http.Request) (
reply.ContentHash = fmt.Sprintf("%x", rawHash)
return reply.ContentHash, reply, nil
})
},
)
if resultHash != "" {
resp.Header().Set("X-Consul-ContentHash", resultHash)
}
return service, err
}
func (s *HTTPServer) AgentChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
@ -769,6 +772,9 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
"local_service_address": "LocalServiceAddress",
// SidecarService
"sidecar_service": "SidecarService",
// Expose Config
"local_path_port": "LocalPathPort",
"listener_port": "ListenerPort",
// DON'T Recurse into these opaque config maps or we might mangle user's
// keys. Note empty canonical is a special sentinel to prevent recursion.
@ -1297,68 +1303,6 @@ func (s *HTTPServer) AgentConnectCALeafCert(resp http.ResponseWriter, req *http.
return reply, nil
}
type agentLocalBlockingFunc func(ws memdb.WatchSet) (string, interface{}, error)
// agentLocalBlockingQuery performs a blocking query in a generic way against
// local agent state that has no RPC or raft to back it. It uses `hash` parameter
// instead of an `index`. The resp is needed to write the `X-Consul-ContentHash`
// header back on return no Status nor body content is ever written to it.
func (s *HTTPServer) agentLocalBlockingQuery(resp http.ResponseWriter, hash string,
queryOpts *structs.QueryOptions, fn agentLocalBlockingFunc) (interface{}, error) {
// If we are not blocking we can skip tracking and allocating - nil WatchSet
// is still valid to call Add on and will just be a no op.
var ws memdb.WatchSet
var timeout *time.Timer
if hash != "" {
// TODO(banks) at least define these defaults somewhere in a const. Would be
// nice not to duplicate the ones in consul/rpc.go too...
wait := queryOpts.MaxQueryTime
if wait == 0 {
wait = 5 * time.Minute
}
if wait > 10*time.Minute {
wait = 10 * time.Minute
}
// Apply a small amount of jitter to the request.
wait += lib.RandomStagger(wait / 16)
timeout = time.NewTimer(wait)
}
for {
// Must reset this every loop in case the Watch set is already closed but
// hash remains same. In that case we'll need to re-block on ws.Watch()
// again.
ws = memdb.NewWatchSet()
curHash, curResp, err := fn(ws)
if err != nil {
return curResp, err
}
// Return immediately if there is no timeout, the hash is different or the
// Watch returns true (indicating timeout fired). Note that Watch on a nil
// WatchSet immediately returns false which would incorrectly cause this to
// loop and repeat again, however we rely on the invariant that ws == nil
// IFF timeout == nil in which case the Watch call is never invoked.
if timeout == nil || hash != curHash || ws.Watch(timeout.C) {
resp.Header().Set("X-Consul-ContentHash", curHash)
return curResp, err
}
// Watch returned false indicating a change was detected, loop and repeat
// the callback to load the new value. If agent sync is paused it means
// local state is currently being bulk-edited e.g. config reload. In this
// case it's likely that local state just got unloaded and may or may not be
// reloaded yet. Wait a short amount of time for Sync to resume to ride out
// typical config reloads.
if syncPauseCh := s.agent.syncPausedCh(); syncPauseCh != nil {
select {
case <-syncPauseCh:
case <-timeout.C:
}
}
}
}
// AgentConnectAuthorize
//
// POST /v1/agent/connect/authorize

View File

@ -325,7 +325,7 @@ func TestAgent_Service(t *testing.T) {
Service: "web-sidecar-proxy",
Port: 8000,
Proxy: expectProxy.ToAPI(),
ContentHash: "f5826efc5ffc207a",
ContentHash: "4c7d5f8d3748be6d",
Weights: api.AgentWeights{
Passing: 1,
Warning: 1,
@ -337,7 +337,7 @@ func TestAgent_Service(t *testing.T) {
// Copy and modify
updatedResponse := *expectedResponse
updatedResponse.Port = 9999
updatedResponse.ContentHash = "c8cb04cb77ef33d8"
updatedResponse.ContentHash = "713435ba1f5badcf"
// Simple response for non-proxy service registered in TestAgent config
expectWebResponse := &api.AgentService{
@ -1980,39 +1980,51 @@ func TestAgent_RegisterCheck_ACLDeny(t *testing.T) {
require.NotNil(t, nodeToken)
t.Run("no token - node check", func(t *testing.T) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register", jsonReader(nodeCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.True(t, acl.IsErrPermissionDenied(err))
retry.Run(t, func(r *retry.R) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register", jsonReader(nodeCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.True(r, acl.IsErrPermissionDenied(err))
})
})
t.Run("svc token - node check", func(t *testing.T) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register?token="+svcToken.SecretID, jsonReader(nodeCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.True(t, acl.IsErrPermissionDenied(err))
retry.Run(t, func(r *retry.R) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register?token="+svcToken.SecretID, jsonReader(nodeCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.True(r, acl.IsErrPermissionDenied(err))
})
})
t.Run("node token - node check", func(t *testing.T) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register?token="+nodeToken.SecretID, jsonReader(nodeCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register?token="+nodeToken.SecretID, jsonReader(nodeCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.NoError(r, err)
})
})
t.Run("no token - svc check", func(t *testing.T) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register", jsonReader(svcCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.True(t, acl.IsErrPermissionDenied(err))
retry.Run(t, func(r *retry.R) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register", jsonReader(svcCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.True(r, acl.IsErrPermissionDenied(err))
})
})
t.Run("node token - svc check", func(t *testing.T) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register?token="+nodeToken.SecretID, jsonReader(svcCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.True(t, acl.IsErrPermissionDenied(err))
retry.Run(t, func(r *retry.R) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register?token="+nodeToken.SecretID, jsonReader(svcCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.True(r, acl.IsErrPermissionDenied(err))
})
})
t.Run("svc token - svc check", func(t *testing.T) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register?token="+svcToken.SecretID, jsonReader(svcCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
req, _ := http.NewRequest("PUT", "/v1/agent/check/register?token="+svcToken.SecretID, jsonReader(svcCheck))
_, err := a.srv.AgentRegisterCheck(nil, req)
require.NoError(r, err)
})
})
}
@ -5386,3 +5398,43 @@ func TestAgent_HostBadACL(t *testing.T) {
assert.Equal(http.StatusOK, resp.Code)
assert.Nil(respRaw)
}
// Thie tests that a proxy with an ExposeConfig is returned as expected.
func TestAgent_Services_ExposeConfig(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
srv1 := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "proxy-id",
Service: "proxy-name",
Port: 8443,
Proxy: structs.ConnectProxyConfig{
Expose: structs.ExposeConfig{
Checks: true,
Paths: []structs.ExposePath{
{
ListenerPort: 8080,
LocalPathPort: 21500,
Protocol: "http2",
Path: "/metrics",
},
},
},
},
}
a.State.AddService(srv1, "")
req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
obj, err := a.srv.AgentServices(nil, req)
require.NoError(t, err)
val := obj.(map[string]*api.AgentService)
require.Len(t, val, 1)
actual := val["proxy-id"]
require.NotNil(t, actual)
require.Equal(t, api.ServiceKindConnectProxy, actual.Kind)
require.Equal(t, srv1.Proxy.ToAPI(), actual.Proxy)
}

View File

@ -29,7 +29,7 @@ import (
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/types"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/go-uuid"
"github.com/pascaldekloe/goe/verify"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -3538,6 +3538,205 @@ func TestAgent_consulConfig_RaftTrailingLogs(t *testing.T) {
require.Equal(t, uint64(812345), a.consulConfig().RaftConfig.TrailingLogs)
}
func TestAgent_grpcInjectAddr(t *testing.T) {
tt := []struct {
name string
grpc string
ip string
port int
want string
}{
{
name: "localhost web svc",
grpc: "localhost:8080/web",
ip: "192.168.0.0",
port: 9090,
want: "192.168.0.0:9090/web",
},
{
name: "localhost no svc",
grpc: "localhost:8080",
ip: "192.168.0.0",
port: 9090,
want: "192.168.0.0:9090",
},
{
name: "ipv4 web svc",
grpc: "127.0.0.1:8080/web",
ip: "192.168.0.0",
port: 9090,
want: "192.168.0.0:9090/web",
},
{
name: "ipv4 no svc",
grpc: "127.0.0.1:8080",
ip: "192.168.0.0",
port: 9090,
want: "192.168.0.0:9090",
},
{
name: "ipv6 no svc",
grpc: "2001:db8:1f70::999:de8:7648:6e8:5000",
ip: "192.168.0.0",
port: 9090,
want: "192.168.0.0:9090",
},
{
name: "ipv6 web svc",
grpc: "2001:db8:1f70::999:de8:7648:6e8:5000/web",
ip: "192.168.0.0",
port: 9090,
want: "192.168.0.0:9090/web",
},
{
name: "zone ipv6 web svc",
grpc: "::FFFF:C0A8:1%1:5000/web",
ip: "192.168.0.0",
port: 9090,
want: "192.168.0.0:9090/web",
},
{
name: "ipv6 literal web svc",
grpc: "::FFFF:192.168.0.1:5000/web",
ip: "192.168.0.0",
port: 9090,
want: "192.168.0.0:9090/web",
},
{
name: "ipv6 injected into ipv6 url",
grpc: "2001:db8:1f70::999:de8:7648:6e8:5000",
ip: "::FFFF:C0A8:1",
port: 9090,
want: "::FFFF:C0A8:1:9090",
},
{
name: "ipv6 injected into ipv6 url with svc",
grpc: "2001:db8:1f70::999:de8:7648:6e8:5000/web",
ip: "::FFFF:C0A8:1",
port: 9090,
want: "::FFFF:C0A8:1:9090/web",
},
{
name: "ipv6 injected into ipv6 url with special",
grpc: "2001:db8:1f70::999:de8:7648:6e8:5000/service-$name:with@special:Chars",
ip: "::FFFF:C0A8:1",
port: 9090,
want: "::FFFF:C0A8:1:9090/service-$name:with@special:Chars",
},
}
for _, tt := range tt {
t.Run(tt.name, func(t *testing.T) {
got := grpcInjectAddr(tt.grpc, tt.ip, tt.port)
if got != tt.want {
t.Errorf("httpInjectAddr() got = %v, want %v", got, tt.want)
}
})
}
}
func TestAgent_httpInjectAddr(t *testing.T) {
tt := []struct {
name string
url string
ip string
port int
want string
}{
{
name: "localhost health",
url: "http://localhost:8080/health",
ip: "192.168.0.0",
port: 9090,
want: "http://192.168.0.0:9090/health",
},
{
name: "https localhost health",
url: "https://localhost:8080/health",
ip: "192.168.0.0",
port: 9090,
want: "https://192.168.0.0:9090/health",
},
{
name: "https ipv4 health",
url: "https://127.0.0.1:8080/health",
ip: "192.168.0.0",
port: 9090,
want: "https://192.168.0.0:9090/health",
},
{
name: "https ipv4 without path",
url: "https://127.0.0.1:8080",
ip: "192.168.0.0",
port: 9090,
want: "https://192.168.0.0:9090",
},
{
name: "https ipv6 health",
url: "https://[2001:db8:1f70::999:de8:7648:6e8]:5000/health",
ip: "192.168.0.0",
port: 9090,
want: "https://192.168.0.0:9090/health",
},
{
name: "https ipv6 with zone",
url: "https://[::FFFF:C0A8:1%1]:5000/health",
ip: "192.168.0.0",
port: 9090,
want: "https://192.168.0.0:9090/health",
},
{
name: "https ipv6 literal",
url: "https://[::FFFF:192.168.0.1]:5000/health",
ip: "192.168.0.0",
port: 9090,
want: "https://192.168.0.0:9090/health",
},
{
name: "https ipv6 without path",
url: "https://[2001:db8:1f70::999:de8:7648:6e8]:5000",
ip: "192.168.0.0",
port: 9090,
want: "https://192.168.0.0:9090",
},
{
name: "ipv6 injected into ipv6 url",
url: "https://[2001:db8:1f70::999:de8:7648:6e8]:5000",
ip: "::FFFF:C0A8:1",
port: 9090,
want: "https://[::FFFF:C0A8:1]:9090",
},
{
name: "ipv6 with brackets injected into ipv6 url",
url: "https://[2001:db8:1f70::999:de8:7648:6e8]:5000",
ip: "[::FFFF:C0A8:1]",
port: 9090,
want: "https://[::FFFF:C0A8:1]:9090",
},
{
name: "short domain health",
url: "http://i.co:8080/health",
ip: "192.168.0.0",
port: 9090,
want: "http://192.168.0.0:9090/health",
},
{
name: "nested url in query",
url: "http://my.corp.com:8080/health?from=http://google.com:8080",
ip: "192.168.0.0",
port: 9090,
want: "http://192.168.0.0:9090/health?from=http://google.com:8080",
},
}
for _, tt := range tt {
t.Run(tt.name, func(t *testing.T) {
got := httpInjectAddr(tt.url, tt.ip, tt.port)
if got != tt.want {
t.Errorf("httpInjectAddr() got = %v, want %v", got, tt.want)
}
})
}
}
func TestDefaultIfEmpty(t *testing.T) {
require.Equal(t, "", defaultIfEmpty("", ""))
require.Equal(t, "foo", defaultIfEmpty("", "foo"))
@ -3574,3 +3773,241 @@ func TestConfigSourceFromName(t *testing.T) {
})
}
}
func TestAgent_RerouteExistingHTTPChecks(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Register a service without a ProxyAddr
svc := &structs.NodeService{
ID: "web",
Service: "web",
Address: "localhost",
Port: 8080,
}
chks := []*structs.CheckType{
{
CheckID: "http",
HTTP: "http://localhost:8080/mypath?query",
Interval: 20 * time.Millisecond,
TLSSkipVerify: true,
},
{
CheckID: "grpc",
GRPC: "localhost:8080/myservice",
Interval: 20 * time.Millisecond,
TLSSkipVerify: true,
},
}
if err := a.AddService(svc, chks, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("failed to add svc: %v", err)
}
// Register a proxy and expose HTTP checks
// This should trigger setting ProxyHTTP and ProxyGRPC in the checks
proxy := &structs.NodeService{
Kind: "connect-proxy",
ID: "web-proxy",
Service: "web-proxy",
Address: "localhost",
Port: 21500,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web",
LocalServiceAddress: "localhost",
LocalServicePort: 8080,
MeshGateway: structs.MeshGatewayConfig{},
Expose: structs.ExposeConfig{
Checks: true,
},
},
}
if err := a.AddService(proxy, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("failed to add svc: %v", err)
}
retry.Run(t, func(r *retry.R) {
chks := a.ServiceHTTPBasedChecks("web")
got := chks[0].ProxyHTTP
if got == "" {
r.Fatal("proxyHTTP addr not set in check")
}
want := "http://localhost:21500/mypath?query"
if got != want {
r.Fatalf("unexpected proxy addr in check, want: %s, got: %s", want, got)
}
})
retry.Run(t, func(r *retry.R) {
chks := a.ServiceHTTPBasedChecks("web")
// Will be at a later index than HTTP check because of the fetching order in ServiceHTTPBasedChecks
got := chks[1].ProxyGRPC
if got == "" {
r.Fatal("ProxyGRPC addr not set in check")
}
// Node that this relies on listener ports auto-incrementing in a.listenerPortLocked
want := "localhost:21501/myservice"
if got != want {
r.Fatalf("unexpected proxy addr in check, want: %s, got: %s", want, got)
}
})
// Re-register a proxy and disable exposing HTTP checks
// This should trigger resetting ProxyHTTP and ProxyGRPC to empty strings
proxy = &structs.NodeService{
Kind: "connect-proxy",
ID: "web-proxy",
Service: "web-proxy",
Address: "localhost",
Port: 21500,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web",
LocalServiceAddress: "localhost",
LocalServicePort: 8080,
MeshGateway: structs.MeshGatewayConfig{},
Expose: structs.ExposeConfig{
Checks: false,
},
},
}
if err := a.AddService(proxy, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("failed to add svc: %v", err)
}
retry.Run(t, func(r *retry.R) {
chks := a.ServiceHTTPBasedChecks("web")
got := chks[0].ProxyHTTP
if got != "" {
r.Fatal("ProxyHTTP addr was not reset")
}
})
retry.Run(t, func(r *retry.R) {
chks := a.ServiceHTTPBasedChecks("web")
// Will be at a later index than HTTP check because of the fetching order in ServiceHTTPBasedChecks
got := chks[1].ProxyGRPC
if got != "" {
r.Fatal("ProxyGRPC addr was not reset")
}
})
}
func TestAgent_RerouteNewHTTPChecks(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Register a service without a ProxyAddr
svc := &structs.NodeService{
ID: "web",
Service: "web",
Address: "localhost",
Port: 8080,
}
if err := a.AddService(svc, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("failed to add svc: %v", err)
}
// Register a proxy and expose HTTP checks
proxy := &structs.NodeService{
Kind: "connect-proxy",
ID: "web-proxy",
Service: "web-proxy",
Address: "localhost",
Port: 21500,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web",
LocalServiceAddress: "localhost",
LocalServicePort: 8080,
MeshGateway: structs.MeshGatewayConfig{},
Expose: structs.ExposeConfig{
Checks: true,
},
},
}
if err := a.AddService(proxy, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("failed to add svc: %v", err)
}
checks := []*structs.HealthCheck{
{
CheckID: "http",
Name: "http",
ServiceID: "web",
Status: api.HealthCritical,
},
{
CheckID: "grpc",
Name: "grpc",
ServiceID: "web",
Status: api.HealthCritical,
},
}
chkTypes := []*structs.CheckType{
{
CheckID: "http",
HTTP: "http://localhost:8080/mypath?query",
Interval: 20 * time.Millisecond,
TLSSkipVerify: true,
},
{
CheckID: "grpc",
GRPC: "localhost:8080/myservice",
Interval: 20 * time.Millisecond,
TLSSkipVerify: true,
},
}
// ProxyGRPC and ProxyHTTP should be set when creating check
// since proxy.expose.checks is enabled on the proxy
if err := a.AddCheck(checks[0], chkTypes[0], false, "", ConfigSourceLocal); err != nil {
t.Fatalf("failed to add check: %v", err)
}
if err := a.AddCheck(checks[1], chkTypes[1], false, "", ConfigSourceLocal); err != nil {
t.Fatalf("failed to add check: %v", err)
}
retry.Run(t, func(r *retry.R) {
chks := a.ServiceHTTPBasedChecks("web")
got := chks[0].ProxyHTTP
if got == "" {
r.Fatal("ProxyHTTP addr not set in check")
}
want := "http://localhost:21500/mypath?query"
if got != want {
r.Fatalf("unexpected proxy addr in http check, want: %s, got: %s", want, got)
}
})
retry.Run(t, func(r *retry.R) {
chks := a.ServiceHTTPBasedChecks("web")
// Will be at a later index than HTTP check because of the fetching order in ServiceHTTPBasedChecks
got := chks[1].ProxyGRPC
if got == "" {
r.Fatal("ProxyGRPC addr not set in check")
}
want := "localhost:21501/myservice"
if got != want {
r.Fatalf("unexpected proxy addr in grpc check, want: %s, got: %s", want, got)
}
})
}

View File

@ -0,0 +1,132 @@
package cachetype
import (
"fmt"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
"github.com/mitchellh/hashstructure"
"time"
)
// Recommended name for registration.
const ServiceHTTPChecksName = "service-http-checks"
type Agent interface {
ServiceHTTPBasedChecks(id string) []structs.CheckType
LocalState() *local.State
LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Duration,
fn func(ws memdb.WatchSet) (string, interface{}, error)) (string, interface{}, error)
}
// ServiceHTTPBasedChecks supports fetching discovering checks in the local state
type ServiceHTTPChecks struct {
Agent Agent
}
func (c *ServiceHTTPChecks) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult
// The request should be a CatalogDatacentersRequest.
reqReal, ok := req.(*ServiceHTTPChecksRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: got wrong request type: %T, want: ServiceHTTPChecksRequest", req)
}
var lastChecks []structs.CheckType
var lastHash string
var err error
// Hash last known result as a baseline
if opts.LastResult != nil {
lastChecks, ok = opts.LastResult.Value.([]structs.CheckType)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: last value in cache of wrong type: %T, want: CheckType", req)
}
lastHash, err = hashChecks(lastChecks)
if err != nil {
return result, fmt.Errorf("Internal cache failure: %v", err)
}
}
hash, resp, err := c.Agent.LocalBlockingQuery(true, lastHash, reqReal.MaxQueryTime,
func(ws memdb.WatchSet) (string, interface{}, error) {
svcState := c.Agent.LocalState().ServiceState(reqReal.ServiceID)
if svcState == nil {
return "", result, fmt.Errorf("Internal cache failure: service '%s' not in agent state", reqReal.ServiceID)
}
// WatchCh will receive updates on service (de)registrations and check (de)registrations
ws.Add(svcState.WatchCh)
reply := c.Agent.ServiceHTTPBasedChecks(reqReal.ServiceID)
hash, err := hashChecks(reply)
if err != nil {
return "", result, fmt.Errorf("Internal cache failure: %v", err)
}
return hash, reply, nil
},
)
result.Value = resp
// Below is a purely synthetic index to keep the caching happy.
if opts.LastResult == nil {
result.Index = 1
return result, nil
}
result.Index = opts.LastResult.Index
if lastHash == "" || hash != lastHash {
result.Index += 1
}
return result, nil
}
func (c *ServiceHTTPChecks) SupportsBlocking() bool {
return true
}
// ServiceHTTPChecksRequest is the cache.Request implementation for the
// ServiceHTTPBasedChecks cache type. This is implemented here and not in structs
// since this is only used for cache-related requests and not forwarded
// directly to any Consul servers.
type ServiceHTTPChecksRequest struct {
ServiceID string
MinQueryIndex uint64
MaxQueryTime time.Duration
}
func (s *ServiceHTTPChecksRequest) CacheInfo() cache.RequestInfo {
return cache.RequestInfo{
Token: "",
Key: ServiceHTTPChecksName + ":" + s.ServiceID,
Datacenter: "",
MinIndex: s.MinQueryIndex,
Timeout: s.MaxQueryTime,
}
}
func hashChecks(checks []structs.CheckType) (string, error) {
if len(checks) == 0 {
return "", nil
}
// Wrapper created to use "set" struct tag, that way ordering doesn't lead to false-positives
wrapper := struct {
ChkTypes []structs.CheckType `hash:"set"`
}{
ChkTypes: checks,
}
b, err := hashstructure.Hash(wrapper, nil)
if err != nil {
return "", fmt.Errorf("failed to hash checks: %v", err)
}
return fmt.Sprintf("%d", b), nil
}

View File

@ -0,0 +1,216 @@
package cachetype
import (
"fmt"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
"testing"
"time"
)
func TestServiceHTTPChecks_Fetch(t *testing.T) {
chkTypes := []*structs.CheckType{
{
CheckID: "http-check",
HTTP: "localhost:8080/health",
Interval: 5 * time.Second,
OutputMaxSize: checks.DefaultBufSize,
},
{
CheckID: "grpc-check",
GRPC: "localhost:9090/v1.Health",
Interval: 5 * time.Second,
},
{
CheckID: "ttl-check",
TTL: 10 * time.Second,
},
}
svcState := local.ServiceState{
Service: &structs.NodeService{
ID: "web",
},
}
// Create mockAgent and cache type
a := newMockAgent()
a.LocalState().SetServiceState(&svcState)
typ := ServiceHTTPChecks{Agent: a}
// Adding TTL check should not yield result from Fetch since TTL checks aren't tracked
if err := a.AddCheck(*chkTypes[2]); err != nil {
t.Fatalf("failed to add check: %v", err)
}
result, err := typ.Fetch(
cache.FetchOptions{},
&ServiceHTTPChecksRequest{ServiceID: svcState.Service.ID, MaxQueryTime: 100 * time.Millisecond},
)
if err != nil {
t.Fatalf("failed to fetch: %v", err)
}
got, ok := result.Value.([]structs.CheckType)
if !ok {
t.Fatalf("fetched value of wrong type, got %T, want []structs.CheckType", result.Value)
}
require.Empty(t, got)
// Adding HTTP check should yield check in Fetch
if err := a.AddCheck(*chkTypes[0]); err != nil {
t.Fatalf("failed to add check: %v", err)
}
result, err = typ.Fetch(
cache.FetchOptions{},
&ServiceHTTPChecksRequest{ServiceID: svcState.Service.ID},
)
if err != nil {
t.Fatalf("failed to fetch: %v", err)
}
if result.Index != 1 {
t.Fatalf("expected index of 1 after first cache hit, got %d", result.Index)
}
got, ok = result.Value.([]structs.CheckType)
if !ok {
t.Fatalf("fetched value of wrong type, got %T, want []structs.CheckType", result.Value)
}
want := chkTypes[0:1]
for i, c := range want {
require.Equal(t, *c, got[i])
}
// Adding GRPC check should yield both checks in Fetch
if err := a.AddCheck(*chkTypes[1]); err != nil {
t.Fatalf("failed to add check: %v", err)
}
result2, err := typ.Fetch(
cache.FetchOptions{LastResult: &result},
&ServiceHTTPChecksRequest{ServiceID: svcState.Service.ID},
)
if err != nil {
t.Fatalf("failed to fetch: %v", err)
}
if result2.Index != 2 {
t.Fatalf("expected index of 2 after second request, got %d", result2.Index)
}
got, ok = result2.Value.([]structs.CheckType)
if !ok {
t.Fatalf("fetched value of wrong type, got %T, want []structs.CheckType", got)
}
want = chkTypes[0:2]
for i, c := range want {
require.Equal(t, *c, got[i])
}
// Removing GRPC check should yield HTTP check in Fetch
if err := a.RemoveCheck(chkTypes[1].CheckID); err != nil {
t.Fatalf("failed to add check: %v", err)
}
result3, err := typ.Fetch(
cache.FetchOptions{LastResult: &result2},
&ServiceHTTPChecksRequest{ServiceID: svcState.Service.ID},
)
if err != nil {
t.Fatalf("failed to fetch: %v", err)
}
if result3.Index != 3 {
t.Fatalf("expected index of 3 after third request, got %d", result3.Index)
}
got, ok = result3.Value.([]structs.CheckType)
if !ok {
t.Fatalf("fetched value of wrong type, got %T, want []structs.CheckType", got)
}
want = chkTypes[0:1]
for i, c := range want {
require.Equal(t, *c, got[i])
}
// Fetching again should yield no change in result nor index
result4, err := typ.Fetch(
cache.FetchOptions{LastResult: &result3},
&ServiceHTTPChecksRequest{ServiceID: svcState.Service.ID, MaxQueryTime: 100 * time.Millisecond},
)
if err != nil {
t.Fatalf("failed to fetch: %v", err)
}
if result4.Index != 3 {
t.Fatalf("expected index of 3 after fetch timeout, got %d", result4.Index)
}
got, ok = result4.Value.([]structs.CheckType)
if !ok {
t.Fatalf("fetched value of wrong type, got %T, want []structs.CheckType", got)
}
want = chkTypes[0:1]
for i, c := range want {
require.Equal(t, *c, got[i])
}
}
func TestServiceHTTPChecks_badReqType(t *testing.T) {
a := newMockAgent()
typ := ServiceHTTPChecks{Agent: a}
// Fetch
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
require.Error(t, err)
require.Contains(t, err.Error(), "wrong request type")
}
type mockAgent struct {
state *local.State
checks []structs.CheckType
}
func newMockAgent() *mockAgent {
m := mockAgent{
state: local.NewState(local.Config{NodeID: "host"}, nil, new(token.Store)),
checks: make([]structs.CheckType, 0),
}
m.state.TriggerSyncChanges = func() {}
return &m
}
func (m *mockAgent) ServiceHTTPBasedChecks(id string) []structs.CheckType {
return m.checks
}
func (m *mockAgent) LocalState() *local.State {
return m.state
}
func (m *mockAgent) LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Duration,
fn func(ws memdb.WatchSet) (string, interface{}, error)) (string, interface{}, error) {
hash, err := hashChecks(m.checks)
if err != nil {
return "", nil, fmt.Errorf("failed to hash checks: %+v", m.checks)
}
return hash, m.checks, nil
}
func (m *mockAgent) AddCheck(check structs.CheckType) error {
if check.IsHTTP() || check.IsGRPC() {
m.checks = append(m.checks, check)
}
return nil
}
func (m *mockAgent) RemoveCheck(id types.CheckID) error {
new := make([]structs.CheckType, 0)
for _, c := range m.checks {
if c.CheckID != id {
new = append(new, c)
}
}
m.checks = new
return nil
}

View File

@ -3,6 +3,7 @@ package checks
import (
"crypto/tls"
"fmt"
"github.com/hashicorp/consul/agent/structs"
"io"
"io/ioutil"
"log"
@ -58,6 +59,7 @@ type CheckNotifier interface {
type CheckMonitor struct {
Notify CheckNotifier
CheckID types.CheckID
ServiceID string
Script string
ScriptArgs []string
Interval time.Duration
@ -210,10 +212,11 @@ func (c *CheckMonitor) check() {
// but upon the TTL expiring, the check status is
// automatically set to critical.
type CheckTTL struct {
Notify CheckNotifier
CheckID types.CheckID
TTL time.Duration
Logger *log.Logger
Notify CheckNotifier
CheckID types.CheckID
ServiceID string
TTL time.Duration
Logger *log.Logger
timer *time.Timer
@ -308,6 +311,7 @@ func (c *CheckTTL) SetStatus(status, output string) string {
type CheckHTTP struct {
Notify CheckNotifier
CheckID types.CheckID
ServiceID string
HTTP string
Header map[string][]string
Method string
@ -321,6 +325,23 @@ type CheckHTTP struct {
stop bool
stopCh chan struct{}
stopLock sync.Mutex
// Set if checks are exposed through Connect proxies
// If set, this is the target of check()
ProxyHTTP string
}
func (c *CheckHTTP) CheckType() structs.CheckType {
return structs.CheckType{
CheckID: c.CheckID,
HTTP: c.HTTP,
Method: c.Method,
Header: c.Header,
Interval: c.Interval,
ProxyHTTP: c.ProxyHTTP,
Timeout: c.Timeout,
OutputMaxSize: c.OutputMaxSize,
}
}
// Start is used to start an HTTP check.
@ -390,7 +411,12 @@ func (c *CheckHTTP) check() {
method = "GET"
}
req, err := http.NewRequest(method, c.HTTP, nil)
target := c.HTTP
if c.ProxyHTTP != "" {
target = c.ProxyHTTP
}
req, err := http.NewRequest(method, target, nil)
if err != nil {
c.Logger.Printf("[WARN] agent: Check %q HTTP request failed: %s", c.CheckID, err)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
@ -430,7 +456,7 @@ func (c *CheckHTTP) check() {
}
// Format the response body
result := fmt.Sprintf("HTTP %s %s: %s Output: %s", method, c.HTTP, resp.Status, output.String())
result := fmt.Sprintf("HTTP %s %s: %s Output: %s", method, target, resp.Status, output.String())
if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
// PASSING (2xx)
@ -456,12 +482,13 @@ func (c *CheckHTTP) check() {
// The check is passing if the connection succeeds
// The check is critical if the connection returns an error
type CheckTCP struct {
Notify CheckNotifier
CheckID types.CheckID
TCP string
Interval time.Duration
Timeout time.Duration
Logger *log.Logger
Notify CheckNotifier
CheckID types.CheckID
ServiceID string
TCP string
Interval time.Duration
Timeout time.Duration
Logger *log.Logger
dialer *net.Dialer
stop bool
@ -537,6 +564,7 @@ func (c *CheckTCP) check() {
type CheckDocker struct {
Notify CheckNotifier
CheckID types.CheckID
ServiceID string
Script string
ScriptArgs []string
DockerContainerID string
@ -656,6 +684,7 @@ func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) {
type CheckGRPC struct {
Notify CheckNotifier
CheckID types.CheckID
ServiceID string
GRPC string
Interval time.Duration
Timeout time.Duration
@ -666,6 +695,20 @@ type CheckGRPC struct {
stop bool
stopCh chan struct{}
stopLock sync.Mutex
// Set if checks are exposed through Connect proxies
// If set, this is the target of check()
ProxyGRPC string
}
func (c *CheckGRPC) CheckType() structs.CheckType {
return structs.CheckType{
CheckID: c.CheckID,
GRPC: c.GRPC,
ProxyGRPC: c.ProxyGRPC,
Interval: c.Interval,
Timeout: c.Timeout,
}
}
func (c *CheckGRPC) Start() {
@ -697,13 +740,18 @@ func (c *CheckGRPC) run() {
}
func (c *CheckGRPC) check() {
err := c.probe.Check()
target := c.GRPC
if c.ProxyGRPC != "" {
target = c.ProxyGRPC
}
err := c.probe.Check(target)
if err != nil {
c.Logger.Printf("[DEBUG] agent: Check %q failed: %s", c.CheckID, err.Error())
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
} else {
c.Logger.Printf("[DEBUG] agent: Check %q is passing", c.CheckID)
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("gRPC check %s: success", c.GRPC))
c.Notify.UpdateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("gRPC check %s: success", target))
}
}

View File

@ -19,7 +19,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/types"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/go-uuid"
)
func uniqueID() string {
@ -328,6 +328,69 @@ func TestCheckHTTP(t *testing.T) {
}
}
func TestCheckHTTP_Proxied(t *testing.T) {
t.Parallel()
proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Proxy Server")
}))
defer proxy.Close()
notif := mock.NewNotify()
check := &CheckHTTP{
Notify: notif,
CheckID: types.CheckID("foo"),
HTTP: "",
Method: "GET",
OutputMaxSize: DefaultBufSize,
Interval: 10 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
ProxyHTTP: proxy.URL,
}
check.Start()
defer check.Stop()
// If ProxyHTTP is set, check() reqs should go to that address
retry.Run(t, func(r *retry.R) {
output := notif.Output("foo")
if !strings.Contains(output, "Proxy Server") {
r.Fatalf("c.ProxyHTTP server did not receive request, but should")
}
})
}
func TestCheckHTTP_NotProxied(t *testing.T) {
t.Parallel()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Original Server")
}))
defer server.Close()
notif := mock.NewNotify()
check := &CheckHTTP{
Notify: notif,
CheckID: types.CheckID("foo"),
HTTP: server.URL,
Method: "GET",
OutputMaxSize: DefaultBufSize,
Interval: 10 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
ProxyHTTP: "",
}
check.Start()
defer check.Stop()
// If ProxyHTTP is not set, check() reqs should go to the address in CheckHTTP.HTTP
retry.Run(t, func(r *retry.R) {
output := notif.Output("foo")
if !strings.Contains(output, "Original Server") {
r.Fatalf("server did not receive request")
}
})
}
func TestCheckHTTPTCP_BigTimeout(t *testing.T) {
testCases := []struct {
timeoutIn, intervalIn, timeoutWant time.Duration

View File

@ -28,7 +28,6 @@ type GrpcHealthProbe struct {
func NewGrpcHealthProbe(target string, timeout time.Duration, tlsConfig *tls.Config) *GrpcHealthProbe {
serverAndService := strings.SplitN(target, "/", 2)
server := serverAndService[0]
request := hv1.HealthCheckRequest{}
if len(serverAndService) > 1 {
request.Service = serverAndService[1]
@ -43,7 +42,6 @@ func NewGrpcHealthProbe(target string, timeout time.Duration, tlsConfig *tls.Con
}
return &GrpcHealthProbe{
server: server,
request: &request,
timeout: timeout,
dialOptions: dialOptions,
@ -52,11 +50,14 @@ func NewGrpcHealthProbe(target string, timeout time.Duration, tlsConfig *tls.Con
// Check if the target of this GrpcHealthProbe is healthy
// If nil is returned, target is healthy, otherwise target is not healthy
func (probe *GrpcHealthProbe) Check() error {
func (probe *GrpcHealthProbe) Check(target string) error {
serverAndService := strings.SplitN(target, "/", 2)
server := serverAndService[0]
ctx, cancel := context.WithTimeout(context.Background(), probe.timeout)
defer cancel()
connection, err := grpc.DialContext(ctx, probe.server, probe.dialOptions...)
connection, err := grpc.DialContext(ctx, server, probe.dialOptions...)
if err != nil {
return err
}

View File

@ -4,6 +4,11 @@ import (
"crypto/tls"
"flag"
"fmt"
"github.com/hashicorp/consul/agent/mock"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/types"
"io/ioutil"
"log"
"net"
"os"
@ -88,7 +93,7 @@ func TestCheck(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
probe := NewGrpcHealthProbe(tt.args.target, tt.args.timeout, tt.args.tlsConfig)
actualError := probe.Check()
actualError := probe.Check(tt.args.target)
actuallyHealthy := actualError == nil
if tt.healthy != actuallyHealthy {
t.Errorf("FAIL: %s. Expected healthy %t, but err == %v", tt.name, tt.healthy, actualError)
@ -96,3 +101,55 @@ func TestCheck(t *testing.T) {
})
}
}
func TestGRPC_Proxied(t *testing.T) {
t.Parallel()
notif := mock.NewNotify()
check := &CheckGRPC{
Notify: notif,
CheckID: types.CheckID("foo"),
GRPC: "",
Interval: 10 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
ProxyGRPC: server,
}
check.Start()
defer check.Stop()
// If ProxyGRPC is set, check() reqs should go to that address
retry.Run(t, func(r *retry.R) {
if got, want := notif.Updates("foo"), 2; got < want {
r.Fatalf("got %d updates want at least %d", got, want)
}
if got, want := notif.State("foo"), api.HealthPassing; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
}
func TestGRPC_NotProxied(t *testing.T) {
t.Parallel()
notif := mock.NewNotify()
check := &CheckGRPC{
Notify: notif,
CheckID: types.CheckID("foo"),
GRPC: server,
Interval: 10 * time.Millisecond,
Logger: log.New(ioutil.Discard, uniqueID(), log.LstdFlags),
ProxyGRPC: "",
}
check.Start()
defer check.Stop()
// If ProxyGRPC is not set, check() reqs should go to check.GRPC
retry.Run(t, func(r *retry.R) {
if got, want := notif.Updates("foo"), 2; got < want {
r.Fatalf("got %d updates want at least %d", got, want)
}
if got, want := notif.State("foo"), api.HealthPassing; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
}

View File

@ -22,7 +22,7 @@ import (
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-sockaddr/template"
"golang.org/x/time/rate"
)
@ -369,6 +369,8 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
proxyMaxPort := b.portVal("ports.proxy_max_port", c.Ports.ProxyMaxPort)
sidecarMinPort := b.portVal("ports.sidecar_min_port", c.Ports.SidecarMinPort)
sidecarMaxPort := b.portVal("ports.sidecar_max_port", c.Ports.SidecarMaxPort)
exposeMinPort := b.portVal("ports.expose_min_port", c.Ports.ExposeMinPort)
exposeMaxPort := b.portVal("ports.expose_max_port", c.Ports.ExposeMaxPort)
if proxyMaxPort < proxyMinPort {
return RuntimeConfig{}, fmt.Errorf(
"proxy_min_port must be less than proxy_max_port. To disable, set both to zero.")
@ -377,6 +379,10 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
return RuntimeConfig{}, fmt.Errorf(
"sidecar_min_port must be less than sidecar_max_port. To disable, set both to zero.")
}
if exposeMaxPort < exposeMinPort {
return RuntimeConfig{}, fmt.Errorf(
"expose_min_port must be less than expose_max_port. To disable, set both to zero.")
}
// determine the default bind and advertise address
//
@ -804,6 +810,8 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
ConnectCAConfig: connectCAConfig,
ConnectSidecarMinPort: sidecarMinPort,
ConnectSidecarMaxPort: sidecarMaxPort,
ExposeMinPort: exposeMinPort,
ExposeMaxPort: exposeMaxPort,
DataDir: b.stringVal(c.DataDir),
Datacenter: datacenter,
DevMode: b.boolVal(b.Flags.DevMode),
@ -1305,6 +1313,7 @@ func (b *Builder) serviceProxyVal(v *ServiceProxy) *structs.ConnectProxyConfig {
Config: v.Config,
Upstreams: b.upstreamsVal(v.Upstreams),
MeshGateway: b.meshGatewayConfVal(v.MeshGateway),
Expose: b.exposeConfVal(v.Expose),
}
}
@ -1345,6 +1354,30 @@ func (b *Builder) meshGatewayConfVal(mgConf *MeshGatewayConfig) structs.MeshGate
return cfg
}
func (b *Builder) exposeConfVal(v *ExposeConfig) structs.ExposeConfig {
var out structs.ExposeConfig
if v == nil {
return out
}
out.Checks = b.boolVal(v.Checks)
out.Paths = b.pathsVal(v.Paths)
return out
}
func (b *Builder) pathsVal(v []ExposePath) []structs.ExposePath {
paths := make([]structs.ExposePath, len(v))
for i, p := range v {
paths[i] = structs.ExposePath{
ListenerPort: b.intVal(p.ListenerPort),
Path: b.stringVal(p.Path),
LocalPathPort: b.intVal(p.LocalPathPort),
Protocol: b.stringVal(p.Protocol),
}
}
return paths
}
func (b *Builder) serviceConnectVal(v *ServiceConnect) *structs.ServiceConnect {
if v == nil {
return nil

View File

@ -6,7 +6,7 @@ import (
"strings"
"github.com/hashicorp/consul/lib"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/hcl"
"github.com/mitchellh/mapstructure"
)
@ -89,14 +89,20 @@ func Parse(data string, format string) (c Config, err error) {
"services.connect.proxy.config.upstreams", // Deprecated
"service.connect.proxy.upstreams",
"services.connect.proxy.upstreams",
"service.connect.proxy.expose.paths",
"services.connect.proxy.expose.paths",
"service.proxy.upstreams",
"services.proxy.upstreams",
"service.proxy.expose.paths",
"services.proxy.expose.paths",
// Need all the service(s) exceptions also for nested sidecar service.
"service.connect.sidecar_service.checks",
"services.connect.sidecar_service.checks",
"service.connect.sidecar_service.proxy.upstreams",
"services.connect.sidecar_service.proxy.upstreams",
"service.connect.sidecar_service.proxy.expose.paths",
"services.connect.sidecar_service.proxy.expose.paths",
}, []string{
"config_entries.bootstrap", // completely ignore this tree (fixed elsewhere)
})
@ -468,6 +474,9 @@ type ServiceProxy struct {
// Mesh Gateway Configuration
MeshGateway *MeshGatewayConfig `json:"mesh_gateway,omitempty" hcl:"mesh_gateway" mapstructure:"mesh_gateway"`
// Expose defines whether checks or paths are exposed through the proxy
Expose *ExposeConfig `json:"expose,omitempty" hcl:"expose" mapstructure:"expose"`
}
// Upstream represents a single upstream dependency for a service or proxy. It
@ -513,6 +522,34 @@ type MeshGatewayConfig struct {
Mode *string `json:"mode,omitempty" hcl:"mode" mapstructure:"mode"`
}
// ExposeConfig describes HTTP paths to expose through Envoy outside of Connect.
// Users can expose individual paths and/or all HTTP/GRPC paths for checks.
type ExposeConfig struct {
// Checks defines whether paths associated with Consul checks will be exposed.
// This flag triggers exposing all HTTP and GRPC check paths registered for the service.
Checks *bool `json:"checks,omitempty" hcl:"checks" mapstructure:"checks"`
// Port defines the port of the proxy's listener for exposed paths.
Port *int `json:"port,omitempty" hcl:"port" mapstructure:"port"`
// Paths is the list of paths exposed through the proxy.
Paths []ExposePath `json:"paths,omitempty" hcl:"paths" mapstructure:"paths"`
}
type ExposePath struct {
// ListenerPort defines the port of the proxy's listener for exposed paths.
ListenerPort *int `json:"listener_port,omitempty" hcl:"listener_port" mapstructure:"listener_port"`
// Path is the path to expose through the proxy, ie. "/metrics."
Path *string `json:"path,omitempty" hcl:"path" mapstructure:"path"`
// Protocol describes the upstream's service protocol.
Protocol *string `json:"protocol,omitempty" hcl:"protocol" mapstructure:"protocol"`
// LocalPathPort is the port that the service is listening on for the given path.
LocalPathPort *int `json:"local_path_port,omitempty" hcl:"local_path_port" mapstructure:"local_path_port"`
}
// AutoEncrypt is the agent-global auto_encrypt configuration.
type AutoEncrypt struct {
// TLS enables receiving certificates for clients from servers
@ -606,6 +643,8 @@ type Ports struct {
ProxyMaxPort *int `json:"proxy_max_port,omitempty" hcl:"proxy_max_port" mapstructure:"proxy_max_port"`
SidecarMinPort *int `json:"sidecar_min_port,omitempty" hcl:"sidecar_min_port" mapstructure:"sidecar_min_port"`
SidecarMaxPort *int `json:"sidecar_max_port,omitempty" hcl:"sidecar_max_port" mapstructure:"sidecar_max_port"`
ExposeMinPort *int `json:"expose_min_port,omitempty" hcl:"expose_min_port" mapstructure:"expose_min_port"`
ExposeMaxPort *int `json:"expose_max_port,omitempty" hcl:"expose_max_port" mapstructure:"expose_max_port"`
}
type UnixSocket struct {

View File

@ -122,6 +122,8 @@ func DefaultSource() Source {
proxy_max_port = 20255
sidecar_min_port = 21000
sidecar_max_port = 21255
expose_min_port = 21500
expose_max_port = 21755
}
telemetry = {
metrics_prefix = "consul"

View File

@ -541,6 +541,14 @@ type RuntimeConfig struct {
// specified
ConnectSidecarMaxPort int
// ExposeMinPort is the inclusive start of the range of ports
// allocated to the agent for exposing checks through a proxy
ExposeMinPort int
// ExposeMinPort is the inclusive start of the range of ports
// allocated to the agent for exposing checks through a proxy
ExposeMaxPort int
// ConnectCAProvider is the type of CA provider to use with Connect.
ConnectCAProvider string

View File

@ -1295,6 +1295,36 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
rt.DataDir = dataDir
},
},
{
desc: "min/max ports for dynamic exposed listeners",
args: []string{`-data-dir=` + dataDir},
json: []string{`{
"ports": {
"expose_min_port": 1234,
"expose_max_port": 5678
}
}`},
hcl: []string{`
ports {
expose_min_port = 1234
expose_max_port = 5678
}
`},
patch: func(rt *RuntimeConfig) {
rt.ExposeMinPort = 1234
rt.ExposeMaxPort = 5678
rt.DataDir = dataDir
},
},
{
desc: "defaults for dynamic exposed listeners",
args: []string{`-data-dir=` + dataDir},
patch: func(rt *RuntimeConfig) {
rt.ExposeMinPort = 21500
rt.ExposeMaxPort = 21755
rt.DataDir = dataDir
},
},
// ------------------------------------------------------------
// precedence rules
@ -2338,6 +2368,17 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
}
],
"proxy": {
"expose": {
"checks": true,
"paths": [
{
"path": "/health",
"local_path_port": 8080,
"listener_port": 21500,
"protocol": "http"
}
]
},
"upstreams": [
{
"destination_name": "db",
@ -2363,6 +2404,17 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
}
]
proxy {
expose {
checks = true
paths = [
{
path = "/health"
local_path_port = 8080
listener_port = 21500
protocol = "http"
}
]
},
upstreams = [
{
destination_name = "db"
@ -2391,6 +2443,17 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
},
},
Proxy: &structs.ConnectProxyConfig{
Expose: structs.ExposeConfig{
Checks: true,
Paths: []structs.ExposePath{
{
Path: "/health",
LocalPathPort: 8080,
ListenerPort: 21500,
Protocol: "http",
},
},
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationType: "service",
@ -2434,6 +2497,17 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
}
],
"proxy": {
"expose": {
"checks": true,
"paths": [
{
"path": "/health",
"local_path_port": 8080,
"listener_port": 21500,
"protocol": "http"
}
]
},
"upstreams": [
{
"destination_name": "db",
@ -2459,6 +2533,17 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
}
]
proxy {
expose {
checks = true
paths = [
{
path = "/health"
local_path_port = 8080
listener_port = 21500
protocol = "http"
}
]
},
upstreams = [
{
destination_name = "db"
@ -2487,6 +2572,17 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
},
},
Proxy: &structs.ConnectProxyConfig{
Expose: structs.ExposeConfig{
Checks: true,
Paths: []structs.ExposePath{
{
Path: "/health",
LocalPathPort: 8080,
ListenerPort: 21500,
Protocol: "http",
},
},
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationType: "service",
@ -3627,7 +3723,9 @@ func TestFullConfig(t *testing.T) {
"server": 3757,
"grpc": 4881,
"sidecar_min_port": 8888,
"sidecar_max_port": 9999
"sidecar_max_port": 9999,
"expose_min_port": 1111,
"expose_max_port": 2222
},
"protocol": 30793,
"primary_datacenter": "ejtmd43d",
@ -3871,6 +3969,17 @@ func TestFullConfig(t *testing.T) {
"destination_service_name": "6L6BVfgH",
"local_service_address": "127.0.0.2",
"local_service_port": 23759,
"expose": {
"checks": true,
"paths": [
{
"path": "/health",
"local_path_port": 8080,
"listener_port": 21500,
"protocol": "http"
}
]
},
"upstreams": [
{
"destination_name": "KPtAj2cb",
@ -4205,8 +4314,8 @@ func TestFullConfig(t *testing.T) {
}
pid_file = "43xN80Km"
ports {
dns = 7001,
http = 7999,
dns = 7001
http = 7999
https = 15127
server = 3757
grpc = 4881
@ -4214,6 +4323,8 @@ func TestFullConfig(t *testing.T) {
proxy_max_port = 3000
sidecar_min_port = 8888
sidecar_max_port = 9999
expose_min_port = 1111
expose_max_port = 2222
}
protocol = 30793
primary_datacenter = "ejtmd43d"
@ -4472,6 +4583,17 @@ func TestFullConfig(t *testing.T) {
local_bind_address = "127.24.88.0"
},
]
expose {
checks = true
paths = [
{
path = "/health"
local_path_port = 8080
listener_port = 21500
protocol = "http"
}
]
}
}
},
{
@ -4797,6 +4919,8 @@ func TestFullConfig(t *testing.T) {
ConnectEnabled: true,
ConnectSidecarMinPort: 8888,
ConnectSidecarMaxPort: 9999,
ExposeMinPort: 1111,
ExposeMaxPort: 2222,
ConnectCAProvider: "consul",
ConnectCAConfig: map[string]interface{}{
"RotationPeriod": "90h",
@ -5044,6 +5168,17 @@ func TestFullConfig(t *testing.T) {
LocalBindAddress: "127.24.88.0",
},
},
Expose: structs.ExposeConfig{
Checks: true,
Paths: []structs.ExposePath{
{
Path: "/health",
LocalPathPort: 8080,
ListenerPort: 21500,
Protocol: "http",
},
},
},
},
Weights: &structs.Weights{
Passing: 1,
@ -5683,6 +5818,8 @@ func TestSanitize(t *testing.T) {
"EncryptKey": "hidden",
"EncryptVerifyIncoming": false,
"EncryptVerifyOutgoing": false,
"ExposeMaxPort": 0,
"ExposeMinPort": 0,
"GRPCAddrs": [],
"GRPCPort": 0,
"HTTPAddrs": [
@ -5763,6 +5900,8 @@ func TestSanitize(t *testing.T) {
"Name": "blurb",
"Notes": "",
"OutputMaxSize": ` + strconv.Itoa(checks.DefaultBufSize) + `,
"ProxyGRPC": "",
"ProxyHTTP": "",
"ScriptArgs": [],
"Shell": "",
"Status": "",

View File

@ -373,3 +373,61 @@ func TestConfig_Apply_Decoding(t *testing.T) {
}
})
}
func TestConfig_Apply_ProxyDefaultsExpose(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Create some config entries.
body := bytes.NewBuffer([]byte(`
{
"Kind": "proxy-defaults",
"Name": "global",
"Expose": {
"Checks": true,
"Paths": [
{
"LocalPathPort": 8080,
"ListenerPort": 21500,
"Path": "/healthz",
"Protocol": "http2"
}
]
}
}`))
req, _ := http.NewRequest("PUT", "/v1/config", body)
resp := httptest.NewRecorder()
_, err := a.srv.ConfigApply(resp, req)
require.NoError(t, err)
require.Equal(t, 200, resp.Code, "!200 Response Code: %s", resp.Body.String())
// Get the remaining entry.
{
args := structs.ConfigEntryQuery{
Kind: structs.ProxyDefaults,
Name: "global",
Datacenter: "dc1",
}
var out structs.ConfigEntryResponse
require.NoError(t, a.RPC("ConfigEntry.Get", &args, &out))
require.NotNil(t, out.Entry)
entry := out.Entry.(*structs.ProxyConfigEntry)
expose := structs.ExposeConfig{
Checks: true,
Paths: []structs.ExposePath{
{
LocalPathPort: 8080,
ListenerPort: 21500,
Path: "/healthz",
Protocol: "http2",
},
},
}
require.Equal(t, expose, entry.Expose)
}
}

View File

@ -275,11 +275,18 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
}
reply.ProxyConfig = mapCopy.(map[string]interface{})
reply.MeshGateway = proxyConf.MeshGateway
reply.Expose = proxyConf.Expose
}
reply.Index = index
if serviceConf != nil {
if serviceConf.Expose.Checks {
reply.Expose.Checks = true
}
if len(serviceConf.Expose.Paths) >= 1 {
reply.Expose.Paths = serviceConf.Expose.Paths
}
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
reply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
}

View File

@ -1127,3 +1127,46 @@ operator = "write"
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out))
}
func TestConfigEntry_ProxyDefaultsExposeConfig(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
expose := structs.ExposeConfig{
Checks: true,
Paths: []structs.ExposePath{
{
LocalPathPort: 8080,
ListenerPort: 21500,
Protocol: "http2",
Path: "/healthz",
},
},
}
args := structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ProxyConfigEntry{
Kind: "proxy-defaults",
Name: "global",
Expose: expose,
},
}
out := false
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out))
require.True(t, out)
state := s1.fsm.State()
_, entry, err := state.ConfigEntry(nil, structs.ProxyDefaults, "global")
require.NoError(t, err)
proxyConf, ok := entry.(*structs.ProxyConfigEntry)
require.True(t, ok)
require.Equal(t, expose, proxyConf.Expose)
}

View File

@ -13,11 +13,6 @@ import (
"github.com/hashicorp/consul/agent/structs"
)
const (
// maxQueryTime is used to bound the limit of a blocking query
maxQueryTime = 600 * time.Second
)
// EventFire is used to fire a new event
func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (interface{}, error) {

View File

@ -50,7 +50,7 @@ type ServiceState struct {
// but has not been removed on the server yet.
Deleted bool
// WatchCh is closed when the service state changes suitable for use in a
// WatchCh is closed when the service state changes. Suitable for use in a
// memdb.WatchSet when watching agent local changes with hash-based blocking.
WatchCh chan struct{}
}
@ -366,7 +366,7 @@ func (l *State) SetServiceState(s *ServiceState) {
}
func (l *State) setServiceStateLocked(s *ServiceState) {
s.WatchCh = make(chan struct{})
s.WatchCh = make(chan struct{}, 1)
old, hasOld := l.services[s.Service.ID]
l.services[s.Service.ID] = s

View File

@ -206,7 +206,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {},
},
UpstreamEndpoints: map[string]structs.CheckServiceNodes{},
UpstreamEndpoints: map[string]structs.CheckServiceNodes{},
WatchedServiceChecks: map[string][]structs.CheckType{},
},
Datacenter: "dc1",
},
@ -250,7 +251,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {},
},
UpstreamEndpoints: map[string]structs.CheckServiceNodes{},
UpstreamEndpoints: map[string]structs.CheckServiceNodes{},
WatchedServiceChecks: map[string][]structs.CheckType{},
},
Datacenter: "dc1",
},

View File

@ -14,6 +14,7 @@ type configSnapshotConnectProxy struct {
WatchedUpstreamEndpoints map[string]map[string]structs.CheckServiceNodes
WatchedGateways map[string]map[string]context.CancelFunc
WatchedGatewayEndpoints map[string]map[string]structs.CheckServiceNodes
WatchedServiceChecks map[string][]structs.CheckType
UpstreamEndpoints map[string]structs.CheckServiceNodes // DEPRECATED:see:WatchedUpstreamEndpoints
}

View File

@ -29,6 +29,7 @@ const (
serviceListWatchID = "service-list"
datacentersWatchID = "datacenters"
serviceResolversWatchID = "service-resolvers"
svcChecksWatchIDPrefix = cachetype.ServiceHTTPChecksName + ":"
serviceIDPrefix = string(structs.UpstreamDestTypeService) + ":"
preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":"
defaultPreparedQueryPollInterval = 30 * time.Second
@ -215,6 +216,14 @@ func (s *state) initWatchesConnectProxy() error {
return err
}
// Watch for service check updates
err = s.cache.Notify(s.ctx, cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecksRequest{
ServiceID: s.proxyCfg.DestinationServiceID,
}, svcChecksWatchIDPrefix+s.proxyCfg.DestinationServiceID, s.ch)
if err != nil {
return err
}
// TODO(namespaces): pull this from something like s.source.Namespace?
currentNamespace := "default"
@ -353,6 +362,7 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot {
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedGateways = make(map[string]map[string]context.CancelFunc)
snap.ConnectProxy.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedServiceChecks = make(map[string][]structs.CheckType)
snap.ConnectProxy.UpstreamEndpoints = make(map[string]structs.CheckServiceNodes) // TODO(rb): deprecated
case structs.ServiceKindMeshGateway:
@ -541,6 +551,14 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
pq := strings.TrimPrefix(u.CorrelationID, "upstream:")
snap.ConnectProxy.UpstreamEndpoints[pq] = resp.Nodes
case strings.HasPrefix(u.CorrelationID, svcChecksWatchIDPrefix):
resp, ok := u.Result.([]structs.CheckType)
if !ok {
return fmt.Errorf("invalid type for service checks response: %T, want: []structs.CheckType", u.Result)
}
svcID := strings.TrimPrefix(u.CorrelationID, svcChecksWatchIDPrefix)
snap.ConnectProxy.WatchedServiceChecks[svcID] = resp
default:
return errors.New("unknown correlation ID")
}

View File

@ -20,12 +20,13 @@ import (
// TestCacheTypes encapsulates all the different cache types proxycfg.State will
// watch/request for controlling one during testing.
type TestCacheTypes struct {
roots *ControllableCacheType
leaf *ControllableCacheType
intentions *ControllableCacheType
health *ControllableCacheType
query *ControllableCacheType
compiledChain *ControllableCacheType
roots *ControllableCacheType
leaf *ControllableCacheType
intentions *ControllableCacheType
health *ControllableCacheType
query *ControllableCacheType
compiledChain *ControllableCacheType
serviceHTTPChecks *ControllableCacheType
}
// NewTestCacheTypes creates a set of ControllableCacheTypes for all types that
@ -33,12 +34,13 @@ type TestCacheTypes struct {
func NewTestCacheTypes(t testing.T) *TestCacheTypes {
t.Helper()
ct := &TestCacheTypes{
roots: NewControllableCacheType(t),
leaf: NewControllableCacheType(t),
intentions: NewControllableCacheType(t),
health: NewControllableCacheType(t),
query: NewControllableCacheType(t),
compiledChain: NewControllableCacheType(t),
roots: NewControllableCacheType(t),
leaf: NewControllableCacheType(t),
intentions: NewControllableCacheType(t),
health: NewControllableCacheType(t),
query: NewControllableCacheType(t),
compiledChain: NewControllableCacheType(t),
serviceHTTPChecks: NewControllableCacheType(t),
}
ct.query.blocking = false
return ct
@ -76,6 +78,7 @@ func TestCacheWithTypes(t testing.T, types *TestCacheTypes) *cache.Cache {
RefreshTimer: 0,
RefreshTimeout: 10 * time.Minute,
})
c.RegisterType(cachetype.ServiceHTTPChecksName, types.serviceHTTPChecks, &cache.RegisterOptions{})
return c
}
@ -1005,6 +1008,35 @@ func TestConfigSnapshotMeshGateway(t testing.T) *ConfigSnapshot {
}
}
func TestConfigSnapshotExposeConfig(t testing.T) *ConfigSnapshot {
return &ConfigSnapshot{
Kind: structs.ServiceKindConnectProxy,
Service: "web-proxy",
ProxyID: "web-proxy",
Address: "1.2.3.4",
Port: 8080,
Proxy: structs.ConnectProxyConfig{
LocalServicePort: 8080,
Expose: structs.ExposeConfig{
Checks: false,
Paths: []structs.ExposePath{
{
LocalPathPort: 8080,
Path: "/health1",
ListenerPort: 21500,
},
{
LocalPathPort: 8080,
Path: "/health2",
ListenerPort: 21501,
},
},
},
},
Datacenter: "dc1",
}
}
// ControllableCacheType is a cache.Type that simulates a typical blocking RPC
// but lets us control the responses and when they are delivered easily.
type ControllableCacheType struct {

View File

@ -0,0 +1,110 @@
package agent
import (
"context"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
"github.com/stretchr/testify/require"
"testing"
"time"
)
// Integration test for ServiceHTTPBasedChecks cache-type
// Placed in agent pkg rather than cache-types to avoid circular dependency when importing agent.TestAgent
func TestAgent_ServiceHTTPChecksNotification(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
service := structs.NodeService{
ID: "web",
Service: "web",
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan cache.UpdateEvent)
// Watch for service check updates
err := a.cache.Notify(ctx, cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecksRequest{
ServiceID: service.ID,
}, "service-checks:"+service.ID, ch)
if err != nil {
t.Fatalf("failed to set cache notification: %v", err)
}
chkTypes := []*structs.CheckType{
{
CheckID: "http-check",
HTTP: "localhost:8080/health",
Interval: 5 * time.Second,
OutputMaxSize: checks.DefaultBufSize,
},
{
CheckID: "grpc-check",
GRPC: "localhost:9090/v1.Health",
Interval: 5 * time.Second,
},
{
CheckID: "ttl-check",
TTL: 10 * time.Second,
},
}
// Adding TTL type should lead to a timeout, since only HTTP-based checks are watched
if err := a.AddService(&service, chkTypes[2:], false, "", ConfigSourceLocal); err != nil {
t.Fatalf("failed to add service: %v", err)
}
var val cache.UpdateEvent
select {
case val = <-ch:
t.Fatal("got cache update for TTL check, expected timeout")
case <-time.After(100 * time.Millisecond):
}
// Adding service with HTTP checks should lead notification for them
if err := a.AddService(&service, chkTypes[0:2], false, "", ConfigSourceLocal); err != nil {
t.Fatalf("failed to add service: %v", err)
}
select {
case val = <-ch:
case <-time.After(100 * time.Millisecond):
t.Fatal("didn't get cache update event")
}
got, ok := val.Result.([]structs.CheckType)
if !ok {
t.Fatalf("notified of result of wrong type, got %T, want []structs.CheckType", got)
}
want := chkTypes[0:2]
for i, c := range want {
require.Equal(t, *c, got[i])
}
// Removing the GRPC check should leave only the HTTP check
if err := a.RemoveCheck(chkTypes[1].CheckID, false); err != nil {
t.Fatalf("failed to remove check: %v", err)
}
select {
case val = <-ch:
case <-time.After(100 * time.Millisecond):
t.Fatal("didn't get cache update event")
}
got, ok = val.Result.([]structs.CheckType)
if !ok {
t.Fatalf("notified of result of wrong type, got %T, want []structs.CheckType", got)
}
want = chkTypes[0:1]
for i, c := range want {
require.Equal(t, *c, got[i])
}
}

View File

@ -502,6 +502,10 @@ func (w *serviceConfigWatch) mergeServiceConfig() (*structs.NodeService, error)
return nil, err
}
if err := mergo.Merge(&ns.Proxy.Expose, w.defaults.Expose); err != nil {
return nil, err
}
if ns.Proxy.MeshGateway.Mode == structs.MeshGatewayModeDefault {
ns.Proxy.MeshGateway.Mode = w.defaults.MeshGateway.Mode
}

View File

@ -13,6 +13,8 @@ import (
// HTTP, Docker, TCP and GRPC all require Interval. Only one of the types may
// to be provided: TTL or Script/Interval or HTTP/Interval or TCP/Interval or
// Docker/Interval or GRPC/Interval or AliasService.
// Since types like CheckHTTP and CheckGRPC derive from CheckType, there are
// helper conversion methods that do the reverse conversion. ie. checkHTTP.CheckType()
type CheckType struct {
// fields already embedded in CheckDefinition
// Note: CheckType.CheckID == CheckDefinition.ID
@ -41,6 +43,10 @@ type CheckType struct {
Timeout time.Duration
TTL time.Duration
// Definition fields used when exposing checks through a proxy
ProxyHTTP string
ProxyGRPC string
// DeregisterCriticalServiceAfter, if >0, will cause the associated
// service, if any, to be deregistered if this check is critical for
// longer than this duration.

View File

@ -51,6 +51,7 @@ type ServiceConfigEntry struct {
Name string
Protocol string
MeshGateway MeshGatewayConfig `json:",omitempty"`
Expose ExposeConfig `json:",omitempty"`
ExternalSNI string `json:",omitempty"`
@ -116,6 +117,7 @@ type ProxyConfigEntry struct {
Name string
Config map[string]interface{}
MeshGateway MeshGatewayConfig `json:",omitempty"`
Expose ExposeConfig `json:",omitempty"`
RaftIndex
}
@ -297,10 +299,15 @@ func DecodeConfigEntry(raw map[string]interface{}) (ConfigEntry, error) {
func ConfigEntryDecodeRulesForKind(kind string) (skipWhenPatching []string, translateKeysDict map[string]string, err error) {
switch kind {
case ProxyDefaults:
return nil, map[string]string{
"mesh_gateway": "meshgateway",
"config": "",
}, nil
return []string{
"expose.paths",
"Expose.Paths",
}, map[string]string{
"local_path_port": "localpathport",
"listener_port": "listenerport",
"mesh_gateway": "meshgateway",
"config": "",
}, nil
case ServiceDefaults:
return nil, map[string]string{
"mesh_gateway": "meshgateway",
@ -534,6 +541,7 @@ type ServiceConfigResponse struct {
ProxyConfig map[string]interface{}
UpstreamConfigs map[string]map[string]interface{}
MeshGateway MeshGatewayConfig `json:",omitempty"`
Expose ExposeConfig `json:",omitempty"`
QueryMeta
}

View File

@ -3,11 +3,17 @@ package structs
import (
"encoding/json"
"fmt"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"log"
)
const (
defaultExposeProtocol = "http"
)
var allowedExposeProtocols = map[string]bool{"http": true, "http2": true}
type MeshGatewayMode string
const (
@ -109,6 +115,9 @@ type ConnectProxyConfig struct {
// MeshGateway defines the mesh gateway configuration for this upstream
MeshGateway MeshGatewayConfig `json:",omitempty"`
// Expose defines whether checks or paths are exposed through the proxy
Expose ExposeConfig `json:",omitempty"`
}
func (c *ConnectProxyConfig) MarshalJSON() ([]byte, error) {
@ -137,6 +146,7 @@ func (c *ConnectProxyConfig) ToAPI() *api.AgentServiceConnectProxyConfig {
Config: c.Config,
Upstreams: c.Upstreams.ToAPI(),
MeshGateway: c.MeshGateway.ToAPI(),
Expose: c.Expose.ToAPI(),
}
}
@ -312,3 +322,68 @@ func UpstreamFromAPI(u api.Upstream) Upstream {
Config: u.Config,
}
}
// ExposeConfig describes HTTP paths to expose through Envoy outside of Connect.
// Users can expose individual paths and/or all HTTP/GRPC paths for checks.
type ExposeConfig struct {
// Checks defines whether paths associated with Consul checks will be exposed.
// This flag triggers exposing all HTTP and GRPC check paths registered for the service.
Checks bool `json:",omitempty"`
// Paths is the list of paths exposed through the proxy.
Paths []ExposePath `json:",omitempty"`
}
type ExposePath struct {
// ListenerPort defines the port of the proxy's listener for exposed paths.
ListenerPort int `json:",omitempty"`
// ExposePath is the path to expose through the proxy, ie. "/metrics."
Path string `json:",omitempty"`
// LocalPathPort is the port that the service is listening on for the given path.
LocalPathPort int `json:",omitempty"`
// Protocol describes the upstream's service protocol.
// Valid values are "http" and "http2", defaults to "http"
Protocol string `json:",omitempty"`
// ParsedFromCheck is set if this path was parsed from a registered check
ParsedFromCheck bool
}
func (e *ExposeConfig) ToAPI() api.ExposeConfig {
paths := make([]api.ExposePath, 0)
for _, p := range e.Paths {
paths = append(paths, p.ToAPI())
}
if e.Paths == nil {
paths = nil
}
return api.ExposeConfig{
Checks: e.Checks,
Paths: paths,
}
}
func (p *ExposePath) ToAPI() api.ExposePath {
return api.ExposePath{
ListenerPort: p.ListenerPort,
Path: p.Path,
LocalPathPort: p.LocalPathPort,
Protocol: p.Protocol,
ParsedFromCheck: p.ParsedFromCheck,
}
}
// Finalize validates ExposeConfig and sets default values
func (e *ExposeConfig) Finalize(l *log.Logger) {
for i := 0; i < len(e.Paths); i++ {
path := &e.Paths[i]
if path.Protocol == "" {
path.Protocol = defaultExposeProtocol
}
}
}

View File

@ -54,6 +54,7 @@ func (s *ServiceDefinition) NodeService() *NodeService {
ns.Proxy.Upstreams[i].DestinationType = UpstreamDestTypeService
}
}
ns.Proxy.Expose = s.Proxy.Expose
}
if ns.ID == "" && ns.Service != "" {
ns.ID = ns.Service

View File

@ -14,7 +14,7 @@ import (
"time"
"github.com/hashicorp/go-msgpack/codec"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/serf/coordinate"
"github.com/mitchellh/hashstructure"
@ -974,6 +974,39 @@ func (s *NodeService) Validate() error {
}
bindAddrs[addr] = struct{}{}
}
var knownPaths = make(map[string]bool)
var knownListeners = make(map[int]bool)
for _, path := range s.Proxy.Expose.Paths {
if path.Path == "" {
result = multierror.Append(result, fmt.Errorf("expose.paths: empty path exposed"))
}
if seen := knownPaths[path.Path]; seen {
result = multierror.Append(result, fmt.Errorf("expose.paths: duplicate paths exposed"))
}
knownPaths[path.Path] = true
if seen := knownListeners[path.ListenerPort]; seen {
result = multierror.Append(result, fmt.Errorf("expose.paths: duplicate listener ports exposed"))
}
knownListeners[path.ListenerPort] = true
if path.ListenerPort <= 0 || path.ListenerPort > 65535 {
result = multierror.Append(result, fmt.Errorf("expose.paths: invalid listener port: %d", path.ListenerPort))
}
path.Protocol = strings.ToLower(path.Protocol)
if ok := allowedExposeProtocols[path.Protocol]; !ok && path.Protocol != "" {
protocols := make([]string, 0)
for p, _ := range allowedExposeProtocols {
protocols = append(protocols, p)
}
result = multierror.Append(result,
fmt.Errorf("protocol '%s' not supported for path: %s, must be in: %v",
path.Protocol, path.Path, protocols))
}
}
}
// MeshGateway validation
@ -1150,6 +1183,14 @@ type HealthCheckDefinition struct {
OutputMaxSize uint `json:",omitempty"`
Timeout time.Duration `json:",omitempty"`
DeregisterCriticalServiceAfter time.Duration `json:",omitempty"`
ScriptArgs []string `json:",omitempty"`
DockerContainerID string `json:",omitempty"`
Shell string `json:",omitempty"`
GRPC string `json:",omitempty"`
GRPCUseTLS bool `json:",omitempty"`
AliasNode string `json:",omitempty"`
AliasService string `json:",omitempty"`
TTL time.Duration `json:",omitempty"`
}
func (d *HealthCheckDefinition) MarshalJSON() ([]byte, error) {

View File

@ -60,6 +60,47 @@ var expectedFieldConfigMeshGatewayConfig bexpr.FieldConfigurations = bexpr.Field
},
}
var expectedFieldConfigExposeConfig bexpr.FieldConfigurations = bexpr.FieldConfigurations{
"Checks": &bexpr.FieldConfiguration{
StructFieldName: "Checks",
CoerceFn: bexpr.CoerceBool,
SupportedOperations: []bexpr.MatchOperator{bexpr.MatchEqual, bexpr.MatchNotEqual},
},
"Paths": &bexpr.FieldConfiguration{
StructFieldName: "Paths",
SupportedOperations: []bexpr.MatchOperator{bexpr.MatchIsEmpty, bexpr.MatchIsNotEmpty},
SubFields: expectedFieldConfigPaths,
},
}
var expectedFieldConfigPaths bexpr.FieldConfigurations = bexpr.FieldConfigurations{
"ListenerPort": &bexpr.FieldConfiguration{
StructFieldName: "ListenerPort",
CoerceFn: bexpr.CoerceInt,
SupportedOperations: []bexpr.MatchOperator{bexpr.MatchEqual, bexpr.MatchNotEqual},
},
"Path": &bexpr.FieldConfiguration{
StructFieldName: "Path",
CoerceFn: bexpr.CoerceString,
SupportedOperations: []bexpr.MatchOperator{bexpr.MatchEqual, bexpr.MatchNotEqual, bexpr.MatchIn, bexpr.MatchNotIn, bexpr.MatchMatches, bexpr.MatchNotMatches},
},
"LocalPathPort": &bexpr.FieldConfiguration{
StructFieldName: "LocalPathPort",
CoerceFn: bexpr.CoerceInt,
SupportedOperations: []bexpr.MatchOperator{bexpr.MatchEqual, bexpr.MatchNotEqual},
},
"Protocol": &bexpr.FieldConfiguration{
StructFieldName: "Protocol",
CoerceFn: bexpr.CoerceString,
SupportedOperations: []bexpr.MatchOperator{bexpr.MatchEqual, bexpr.MatchNotEqual, bexpr.MatchIn, bexpr.MatchNotIn, bexpr.MatchMatches, bexpr.MatchNotMatches},
},
"ParsedFromCheck": &bexpr.FieldConfiguration{
StructFieldName: "ParsedFromCheck",
CoerceFn: bexpr.CoerceBool,
SupportedOperations: []bexpr.MatchOperator{bexpr.MatchEqual, bexpr.MatchNotEqual},
},
}
var expectedFieldConfigUpstreams bexpr.FieldConfigurations = bexpr.FieldConfigurations{
"DestinationType": &bexpr.FieldConfiguration{
StructFieldName: "DestinationType",
@ -127,6 +168,10 @@ var expectedFieldConfigConnectProxyConfig bexpr.FieldConfigurations = bexpr.Fiel
StructFieldName: "MeshGateway",
SubFields: expectedFieldConfigMeshGatewayConfig,
},
"Expose": &bexpr.FieldConfiguration{
StructFieldName: "Expose",
SubFields: expectedFieldConfigExposeConfig,
},
}
var expectedFieldConfigServiceConnect bexpr.FieldConfigurations = bexpr.FieldConfigurations{

View File

@ -414,6 +414,63 @@ func TestStructs_NodeService_ValidateMeshGateway(t *testing.T) {
}
}
func TestStructs_NodeService_ValidateExposeConfig(t *testing.T) {
type testCase struct {
Modify func(*NodeService)
Err string
}
cases := map[string]testCase{
"valid": {
func(x *NodeService) {},
"",
},
"empty path": {
func(x *NodeService) { x.Proxy.Expose.Paths[0].Path = "" },
"empty path exposed",
},
"invalid port negative": {
func(x *NodeService) { x.Proxy.Expose.Paths[0].ListenerPort = -1 },
"invalid listener port",
},
"invalid port too large": {
func(x *NodeService) { x.Proxy.Expose.Paths[0].ListenerPort = 65536 },
"invalid listener port",
},
"duplicate paths": {
func(x *NodeService) {
x.Proxy.Expose.Paths[0].Path = "/metrics"
x.Proxy.Expose.Paths[1].Path = "/metrics"
},
"duplicate paths exposed",
},
"duplicate ports": {
func(x *NodeService) {
x.Proxy.Expose.Paths[0].ListenerPort = 21600
x.Proxy.Expose.Paths[1].ListenerPort = 21600
},
"duplicate listener ports exposed",
},
"protocol not supported": {
func(x *NodeService) { x.Proxy.Expose.Paths[0].Protocol = "foo" },
"protocol 'foo' not supported for path",
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ns := TestNodeServiceExpose(t)
tc.Modify(ns)
err := ns.Validate()
if tc.Err == "" {
require.NoError(t, err)
} else {
require.Contains(t, strings.ToLower(err.Error()), strings.ToLower(tc.Err))
}
})
}
}
func TestStructs_NodeService_ValidateConnectProxy(t *testing.T) {
cases := []struct {
Name string

View File

@ -50,6 +50,32 @@ func TestNodeServiceProxy(t testing.T) *NodeService {
}
}
func TestNodeServiceExpose(t testing.T) *NodeService {
return &NodeService{
Kind: ServiceKindConnectProxy,
Service: "test-svc",
Address: "localhost",
Port: 8080,
Proxy: ConnectProxyConfig{
DestinationServiceName: "web",
Expose: ExposeConfig{
Paths: []ExposePath{
{
Path: "/foo",
LocalPathPort: 8080,
ListenerPort: 21500,
},
{
Path: "/bar",
LocalPathPort: 8080,
ListenerPort: 21501,
},
},
},
},
}
}
// TestNodeServiceMeshGateway returns a *NodeService representing a valid Mesh Gateway
func TestNodeServiceMeshGateway(t testing.T) *NodeService {
return TestNodeServiceMeshGatewayWithAddrs(t,

View File

@ -44,7 +44,7 @@ func (s *Server) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapsh
clusters := make([]proto.Message, 0, len(cfgSnap.Proxy.Upstreams)+1)
// Include the "app" cluster for the public listener
appCluster, err := s.makeAppCluster(cfgSnap)
appCluster, err := s.makeAppCluster(cfgSnap, LocalAppClusterName, "", cfgSnap.Proxy.LocalServicePort)
if err != nil {
return nil, err
}
@ -74,9 +74,40 @@ func (s *Server) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapsh
}
}
cfgSnap.Proxy.Expose.Finalize(s.Logger)
paths := cfgSnap.Proxy.Expose.Paths
// Add service health checks to the list of paths to create clusters for if needed
if cfgSnap.Proxy.Expose.Checks {
for _, check := range s.CheckFetcher.ServiceHTTPBasedChecks(cfgSnap.Proxy.DestinationServiceID) {
p, err := parseCheckPath(check)
if err != nil {
s.Logger.Printf("[WARN] envoy: failed to create cluster for check '%s': %v", check.CheckID, err)
continue
}
paths = append(paths, p)
}
}
// Create a new cluster if we need to expose a port that is different from the service port
for _, path := range paths {
if path.LocalPathPort == cfgSnap.Proxy.LocalServicePort {
continue
}
c, err := s.makeAppCluster(cfgSnap, makeExposeClusterName(path.LocalPathPort), path.Protocol, path.LocalPathPort)
if err != nil {
s.Logger.Printf("[WARN] envoy: failed to make local cluster for '%s': %s", path.Path, err)
continue
}
clusters = append(clusters, c)
}
return clusters, nil
}
func makeExposeClusterName(destinationPort int) string {
return fmt.Sprintf("exposed_cluster_%d", destinationPort)
}
// clustersFromSnapshotMeshGateway returns the xDS API representation of the "clusters"
// for a mesh gateway. This will include 1 cluster per remote datacenter as well as
// 1 cluster for each service subset.
@ -122,7 +153,7 @@ func (s *Server) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsho
return clusters, nil
}
func (s *Server) makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
func (s *Server) makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot, name, pathProtocol string, port int) (*envoy.Cluster, error) {
var c *envoy.Cluster
var err error
@ -143,23 +174,23 @@ func (s *Server) makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluste
addr = "127.0.0.1"
}
c = &envoy.Cluster{
Name: LocalAppClusterName,
Name: name,
ConnectTimeout: time.Duration(cfg.LocalConnectTimeoutMs) * time.Millisecond,
ClusterDiscoveryType: &envoy.Cluster_Type{Type: envoy.Cluster_STATIC},
LoadAssignment: &envoy.ClusterLoadAssignment{
ClusterName: LocalAppClusterName,
ClusterName: name,
Endpoints: []envoyendpoint.LocalityLbEndpoints{
{
LbEndpoints: []envoyendpoint.LbEndpoint{
makeEndpoint(LocalAppClusterName,
makeEndpoint(name,
addr,
cfgSnap.Proxy.LocalServicePort),
port),
},
},
},
},
}
if cfg.Protocol == "http2" || cfg.Protocol == "grpc" {
if cfg.Protocol == "http2" || cfg.Protocol == "grpc" || pathProtocol == "http2" {
c.Http2ProtocolOptions = &envoycore.Http2ProtocolOptions{}
}

View File

@ -176,6 +176,22 @@ func TestClustersFromSnapshot(t *testing.T) {
create: proxycfg.TestConfigSnapshotDiscoveryChain_SplitterWithResolverRedirectMultiDC,
setup: nil,
},
{
name: "expose-paths-local-app-paths",
create: proxycfg.TestConfigSnapshotExposeConfig,
},
{
name: "expose-paths-new-cluster-http2",
create: proxycfg.TestConfigSnapshotExposeConfig,
setup: func(snap *proxycfg.ConfigSnapshot) {
snap.Proxy.Expose.Paths[1] = structs.ExposePath{
LocalPathPort: 9090,
Path: "/grpc.health.v1.Health/Check",
ListenerPort: 21501,
Protocol: "http2",
}
},
},
{
name: "mesh-gateway",
create: proxycfg.TestConfigSnapshotMeshGateway,

View File

@ -139,7 +139,7 @@ func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (UpstreamConfig, er
return cfg, err
}
// ParseUpstreamConfig returns the UpstreamConfig parsed from the an opaque map.
// ParseUpstreamConfig returns the UpstreamConfig parsed from an opaque map.
// If an error occurs during parsing it is returned along with the default
// config this allows caller to choose whether and how to report the error.
func ParseUpstreamConfig(m map[string]interface{}) (UpstreamConfig, error) {

View File

@ -4,6 +4,10 @@ import (
"encoding/json"
"errors"
"fmt"
"net"
"net/url"
"regexp"
"strconv"
"strings"
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
@ -71,16 +75,116 @@ func (s *Server) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
}
resources[i+1] = upstreamListener
}
cfgSnap.Proxy.Expose.Finalize(s.Logger)
paths := cfgSnap.Proxy.Expose.Paths
// Add service health checks to the list of paths to create listeners for if needed
if cfgSnap.Proxy.Expose.Checks {
for _, check := range s.CheckFetcher.ServiceHTTPBasedChecks(cfgSnap.Proxy.DestinationServiceID) {
p, err := parseCheckPath(check)
if err != nil {
s.Logger.Printf("[WARN] envoy: failed to create listener for check '%s': %v", check.CheckID, err)
continue
}
paths = append(paths, p)
}
}
// Configure additional listener for exposed check paths
for _, path := range paths {
clusterName := LocalAppClusterName
if path.LocalPathPort != cfgSnap.Proxy.LocalServicePort {
clusterName = makeExposeClusterName(path.LocalPathPort)
}
l, err := s.makeExposedCheckListener(cfgSnap, clusterName, path)
if err != nil {
return nil, err
}
resources = append(resources, l)
}
return resources, nil
}
func parseCheckPath(check structs.CheckType) (structs.ExposePath, error) {
var path structs.ExposePath
if check.HTTP != "" {
path.Protocol = "http"
// Get path and local port from original HTTP target
u, err := url.Parse(check.HTTP)
if err != nil {
return path, fmt.Errorf("failed to parse url '%s': %v", check.HTTP, err)
}
path.Path = u.Path
_, portStr, err := net.SplitHostPort(u.Host)
if err != nil {
return path, fmt.Errorf("failed to parse port from '%s': %v", check.HTTP, err)
}
path.LocalPathPort, err = strconv.Atoi(portStr)
if err != nil {
return path, fmt.Errorf("failed to parse port from '%s': %v", check.HTTP, err)
}
// Get listener port from proxied HTTP target
u, err = url.Parse(check.ProxyHTTP)
if err != nil {
return path, fmt.Errorf("failed to parse url '%s': %v", check.ProxyHTTP, err)
}
_, portStr, err = net.SplitHostPort(u.Host)
if err != nil {
return path, fmt.Errorf("failed to parse port from '%s': %v", check.ProxyHTTP, err)
}
path.ListenerPort, err = strconv.Atoi(portStr)
if err != nil {
return path, fmt.Errorf("failed to parse port from '%s': %v", check.ProxyHTTP, err)
}
}
if check.GRPC != "" {
path.Path = "/grpc.health.v1.Health/Check"
path.Protocol = "http2"
// Get local port from original GRPC target of the form: host/service
proxyServerAndService := strings.SplitN(check.GRPC, "/", 2)
_, portStr, err := net.SplitHostPort(proxyServerAndService[0])
if err != nil {
return path, fmt.Errorf("failed to split host/port from '%s': %v", check.GRPC, err)
}
path.LocalPathPort, err = strconv.Atoi(portStr)
if err != nil {
return path, fmt.Errorf("failed to parse port from '%s': %v", check.GRPC, err)
}
// Get listener port from proxied GRPC target of the form: host/service
proxyServerAndService = strings.SplitN(check.ProxyGRPC, "/", 2)
_, portStr, err = net.SplitHostPort(proxyServerAndService[0])
if err != nil {
return path, fmt.Errorf("failed to split host/port from '%s': %v", check.ProxyGRPC, err)
}
path.ListenerPort, err = strconv.Atoi(portStr)
if err != nil {
return path, fmt.Errorf("failed to parse port from '%s': %v", check.ProxyGRPC, err)
}
}
path.ParsedFromCheck = true
return path, nil
}
// listenersFromSnapshotMeshGateway returns the "listener" for a mesh-gateway service
func (s *Server) listenersFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
cfg, err := ParseMeshGatewayConfig(cfgSnap.Proxy.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Printf("[WARN] envoy: failed to parse Connect.Proxy.Config: %s", err)
s.Logger.Printf("[WARN] envoy: failed to parse Connect.Proxy.Config: %v", err)
}
// TODO - prevent invalid configurations of binding to the same port/addr
@ -221,7 +325,7 @@ func (s *Server) makePublicListener(cfgSnap *proxycfg.ConfigSnapshot, token stri
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Printf("[WARN] envoy: failed to parse Connect.Proxy.Config: %s", err)
s.Logger.Printf("[WARN] envoy: failed to parse Connect.Proxy.Config: %v", err)
}
if cfg.PublicListenerJSON != "" {
@ -253,7 +357,8 @@ func (s *Server) makePublicListener(cfgSnap *proxycfg.ConfigSnapshot, token stri
l = makeListener(PublicListenerName, addr, port)
filter, err := makeListenerFilter(false, cfg.Protocol, "public_listener", LocalAppClusterName, "", true)
filter, err := makeListenerFilter(
false, cfg.Protocol, "public_listener", LocalAppClusterName, "", "", true)
if err != nil {
return nil, err
}
@ -270,6 +375,69 @@ func (s *Server) makePublicListener(cfgSnap *proxycfg.ConfigSnapshot, token stri
return l, err
}
func (s *Server) makeExposedCheckListener(cfgSnap *proxycfg.ConfigSnapshot, cluster string, path structs.ExposePath) (proto.Message, error) {
cfg, err := ParseProxyConfig(cfgSnap.Proxy.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Printf("[WARN] envoy: failed to parse Connect.Proxy.Config: %v", err)
}
// No user config, use default listener
addr := cfgSnap.Address
// Override with bind address if one is set, otherwise default to 0.0.0.0
if cfg.BindAddress != "" {
addr = cfg.BindAddress
} else if addr == "" {
addr = "0.0.0.0"
}
// Strip any special characters from path to make a valid and hopefully unique name
r := regexp.MustCompile(`[^a-zA-Z0-9]+`)
strippedPath := r.ReplaceAllString(path.Path, "")
listenerName := fmt.Sprintf("exposed_path_%s", strippedPath)
l := makeListener(listenerName, addr, path.ListenerPort)
filterName := fmt.Sprintf("exposed_path_filter_%s_%d", strippedPath, path.ListenerPort)
f, err := makeListenerFilter(false, path.Protocol, filterName, cluster, "", path.Path, true)
if err != nil {
return nil, err
}
chain := envoylistener.FilterChain{
Filters: []envoylistener.Filter{f},
}
// For registered checks restrict traffic sources to localhost and Consul's advertise addr
if path.ParsedFromCheck {
// For the advertise addr we use a CidrRange that only matches one address
advertise := s.CfgFetcher.AdvertiseAddrLAN()
// Get prefix length based on whether address is ipv4 (32 bits) or ipv6 (128 bits)
advertiseLen := 32
ip := net.ParseIP(advertise)
if ip != nil && strings.Contains(advertise, ":") {
advertiseLen = 128
}
chain.FilterChainMatch = &envoylistener.FilterChainMatch{
SourcePrefixRanges: []*envoycore.CidrRange{
{AddressPrefix: "127.0.0.1", PrefixLen: &types.UInt32Value{Value: 8}},
{AddressPrefix: "::1", PrefixLen: &types.UInt32Value{Value: 128}},
{AddressPrefix: advertise, PrefixLen: &types.UInt32Value{Value: uint32(advertiseLen)}},
},
}
}
l.FilterChains = []envoylistener.FilterChain{chain}
return l, err
}
// makeUpstreamListenerIgnoreDiscoveryChain counterintuitively takes an (optional) chain
func (s *Server) makeUpstreamListenerIgnoreDiscoveryChain(
u *structs.Upstream,
@ -303,7 +471,8 @@ func (s *Server) makeUpstreamListenerIgnoreDiscoveryChain(
clusterName := CustomizeClusterName(sni, chain)
l := makeListener(upstreamID, addr, u.LocalBindPort)
filter, err := makeListenerFilter(false, cfg.Protocol, upstreamID, clusterName, "upstream_", false)
filter, err := makeListenerFilter(
false, cfg.Protocol, upstreamID, clusterName, "upstream_", "", false)
if err != nil {
return nil, err
}
@ -408,7 +577,8 @@ func (s *Server) makeUpstreamListenerForDiscoveryChain(
proto = "tcp"
}
filter, err := makeListenerFilter(true, proto, upstreamID, "", "upstream_", false)
filter, err := makeListenerFilter(
true, proto, upstreamID, "", "upstream_", "", false)
if err != nil {
return nil, err
}
@ -423,14 +593,17 @@ func (s *Server) makeUpstreamListenerForDiscoveryChain(
return l, nil
}
func makeListenerFilter(useRDS bool, protocol, filterName, cluster, statPrefix string, ingress bool) (envoylistener.Filter, error) {
func makeListenerFilter(
useRDS bool,
protocol, filterName, cluster, statPrefix, routePath string, ingress bool) (envoylistener.Filter, error) {
switch protocol {
case "grpc":
return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, ingress, true, true)
return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, routePath, ingress, true, true)
case "http2":
return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, ingress, false, true)
return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, routePath, ingress, false, true)
case "http":
return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, ingress, false, false)
return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, routePath, ingress, false, false)
case "tcp":
fallthrough
default:
@ -471,7 +644,7 @@ func makeStatPrefix(protocol, prefix, filterName string) string {
func makeHTTPFilter(
useRDS bool,
filterName, cluster, statPrefix string,
filterName, cluster, statPrefix, routePath string,
ingress, grpc, http2 bool,
) (envoylistener.Filter, error) {
op := envoyhttp.INGRESS
@ -482,6 +655,7 @@ func makeHTTPFilter(
if grpc {
proto = "grpc"
}
cfg := &envoyhttp.HttpConnectionManager{
StatPrefix: makeStatPrefix(proto, statPrefix, filterName),
CodecType: envoyhttp.AUTO,
@ -517,33 +691,39 @@ func makeHTTPFilter(
if cluster == "" {
return envoylistener.Filter{}, fmt.Errorf("must specify cluster name when not using RDS")
}
route := envoyroute.Route{
Match: envoyroute.RouteMatch{
PathSpecifier: &envoyroute.RouteMatch_Prefix{
Prefix: "/",
},
// TODO(banks) Envoy supports matching only valid GRPC
// requests which might be nice to add here for gRPC services
// but it's not supported in our current envoy SDK version
// although docs say it was supported by 1.8.0. Going to defer
// that until we've updated the deps.
},
Action: &envoyroute.Route_Route{
Route: &envoyroute.RouteAction{
ClusterSpecifier: &envoyroute.RouteAction_Cluster{
Cluster: cluster,
},
},
},
}
// If a path is provided, do not match on a catch-all prefix
if routePath != "" {
route.Match.PathSpecifier = &envoyroute.RouteMatch_Path{Path: routePath}
}
cfg.RouteSpecifier = &envoyhttp.HttpConnectionManager_RouteConfig{
RouteConfig: &envoy.RouteConfiguration{
Name: filterName,
VirtualHosts: []envoyroute.VirtualHost{
envoyroute.VirtualHost{
{
Name: filterName,
Domains: []string{"*"},
Routes: []envoyroute.Route{
envoyroute.Route{
Match: envoyroute.RouteMatch{
PathSpecifier: &envoyroute.RouteMatch_Prefix{
Prefix: "/",
},
// TODO(banks) Envoy supports matching only valid GRPC
// requests which might be nice to add here for gRPC services
// but it's not supported in our current envoy SDK version
// although docs say it was supported by 1.8.0. Going to defer
// that until we've updated the deps.
},
Action: &envoyroute.Route_Route{
Route: &envoyroute.RouteAction{
ClusterSpecifier: &envoyroute.RouteAction_Cluster{
Cluster: cluster,
},
},
},
},
route,
},
},
},
@ -557,7 +737,7 @@ func makeHTTPFilter(
if grpc {
// Add grpc bridge before router
cfg.HttpFilters = append([]*envoyhttp.HttpFilter{&envoyhttp.HttpFilter{
cfg.HttpFilters = append([]*envoyhttp.HttpFilter{{
Name: "envoy.grpc_http1_bridge",
ConfigType: &envoyhttp.HttpFilter_Config{Config: &types.Struct{}},
}}, cfg.HttpFilters...)

View File

@ -204,6 +204,22 @@ func TestListenersFromSnapshot(t *testing.T) {
create: proxycfg.TestConfigSnapshotDiscoveryChainWithFailoverThroughLocalGateway,
setup: nil,
},
{
name: "expose-paths-local-app-paths",
create: proxycfg.TestConfigSnapshotExposeConfig,
},
{
name: "expose-paths-new-cluster-http2",
create: proxycfg.TestConfigSnapshotExposeConfig,
setup: func(snap *proxycfg.ConfigSnapshot) {
snap.Proxy.Expose.Paths[1] = structs.ExposePath{
LocalPathPort: 9090,
Path: "/grpc.health.v1.Health/Check",
ListenerPort: 21501,
Protocol: "http2",
}
},
},
{
name: "mesh-gateway",
create: proxycfg.TestConfigSnapshotMeshGateway,

View File

@ -97,6 +97,18 @@ type ConnectAuthz interface {
ConnectAuthorize(token string, req *structs.ConnectAuthorizeRequest) (authz bool, reason string, m *cache.ResultMeta, err error)
}
// ServiceChecks is the interface the agent needs to expose
// for the xDS server to fetch a service's HTTP check definitions
type HTTPCheckFetcher interface {
ServiceHTTPBasedChecks(serviceID string) []structs.CheckType
}
// ConfigFetcher is the interface the agent needs to expose
// for the xDS server to fetch agent config, currently only one field is fetched
type ConfigFetcher interface {
AdvertiseAddrLAN() string
}
// ConfigManager is the interface xds.Server requires to consume proxy config
// updates. It's satisfied normally by the agent's proxycfg.Manager, but allows
// easier testing without several layers of mocked cache, local state and
@ -121,6 +133,8 @@ type Server struct {
// This is only used during idle periods of stream interactions (i.e. when
// there has been no recent DiscoveryRequest).
AuthCheckFrequency time.Duration
CheckFetcher HTTPCheckFetcher
CfgFetcher ConfigFetcher
}
// Initialize will finish configuring the Server for first use.

View File

@ -0,0 +1,32 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
"name": "local_app",
"type": "STATIC",
"connectTimeout": "5s",
"loadAssignment": {
"clusterName": "local_app",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "127.0.0.1",
"portValue": 8080
}
}
}
}
]
}
]
}
}
],
"typeUrl": "type.googleapis.com/envoy.api.v2.Cluster",
"nonce": "00000001"
}

View File

@ -0,0 +1,60 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
"name": "exposed_cluster_9090",
"type": "STATIC",
"connectTimeout": "5s",
"loadAssignment": {
"clusterName": "exposed_cluster_9090",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "127.0.0.1",
"portValue": 9090
}
}
}
}
]
}
]
},
"http2ProtocolOptions": {
}
},
{
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
"name": "local_app",
"type": "STATIC",
"connectTimeout": "5s",
"loadAssignment": {
"clusterName": "local_app",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "127.0.0.1",
"portValue": 8080
}
}
}
}
]
}
]
}
}
],
"typeUrl": "type.googleapis.com/envoy.api.v2.Cluster",
"nonce": "00000001"
}

View File

@ -0,0 +1,154 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.api.v2.Listener",
"name": "exposed_path_health1:1.2.3.4:21500",
"address": {
"socketAddress": {
"address": "1.2.3.4",
"portValue": 21500
}
},
"filterChains": [
{
"filters": [
{
"name": "envoy.http_connection_manager",
"config": {
"http_filters": [
{
"name": "envoy.router"
}
],
"route_config": {
"name": "exposed_path_filter_health1_21500",
"virtual_hosts": [
{
"domains": [
"*"
],
"name": "exposed_path_filter_health1_21500",
"routes": [
{
"match": {
"path": "/health1"
},
"route": {
"cluster": "local_app"
}
}
]
}
]
},
"stat_prefix": "exposed_path_filter_health1_21500_http",
"tracing": {
"random_sampling": {
}
}
}
}
]
}
]
},
{
"@type": "type.googleapis.com/envoy.api.v2.Listener",
"name": "exposed_path_health2:1.2.3.4:21501",
"address": {
"socketAddress": {
"address": "1.2.3.4",
"portValue": 21501
}
},
"filterChains": [
{
"filters": [
{
"name": "envoy.http_connection_manager",
"config": {
"http_filters": [
{
"name": "envoy.router"
}
],
"route_config": {
"name": "exposed_path_filter_health2_21501",
"virtual_hosts": [
{
"domains": [
"*"
],
"name": "exposed_path_filter_health2_21501",
"routes": [
{
"match": {
"path": "/health2"
},
"route": {
"cluster": "local_app"
}
}
]
}
]
},
"stat_prefix": "exposed_path_filter_health2_21501_http",
"tracing": {
"random_sampling": {
}
}
}
}
]
}
]
},
{
"@type": "type.googleapis.com/envoy.api.v2.Listener",
"name": "public_listener:1.2.3.4:8080",
"address": {
"socketAddress": {
"address": "1.2.3.4",
"portValue": 8080
}
},
"filterChains": [
{
"tlsContext": {
"requireClientCertificate": true
},
"filters": [
{
"name": "envoy.ext_authz",
"config": {
"grpc_service": {
"envoy_grpc": {
"cluster_name": "local_agent"
},
"initial_metadata": [
{
"key": "x-consul-token",
"value": "my-token"
}
]
},
"stat_prefix": "connect_authz"
}
},
{
"name": "envoy.tcp_proxy",
"config": {
"cluster": "local_app",
"stat_prefix": "public_listener_tcp"
}
}
]
}
]
}
],
"typeUrl": "type.googleapis.com/envoy.api.v2.Listener",
"nonce": "00000001"
}

View File

@ -0,0 +1,156 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.api.v2.Listener",
"name": "exposed_path_grpchealthv1HealthCheck:1.2.3.4:21501",
"address": {
"socketAddress": {
"address": "1.2.3.4",
"portValue": 21501
}
},
"filterChains": [
{
"filters": [
{
"name": "envoy.http_connection_manager",
"config": {
"http2_protocol_options": {
},
"http_filters": [
{
"name": "envoy.router"
}
],
"route_config": {
"name": "exposed_path_filter_grpchealthv1HealthCheck_21501",
"virtual_hosts": [
{
"domains": [
"*"
],
"name": "exposed_path_filter_grpchealthv1HealthCheck_21501",
"routes": [
{
"match": {
"path": "/grpc.health.v1.Health/Check"
},
"route": {
"cluster": "exposed_cluster_9090"
}
}
]
}
]
},
"stat_prefix": "exposed_path_filter_grpchealthv1HealthCheck_21501_http",
"tracing": {
"random_sampling": {
}
}
}
}
]
}
]
},
{
"@type": "type.googleapis.com/envoy.api.v2.Listener",
"name": "exposed_path_health1:1.2.3.4:21500",
"address": {
"socketAddress": {
"address": "1.2.3.4",
"portValue": 21500
}
},
"filterChains": [
{
"filters": [
{
"name": "envoy.http_connection_manager",
"config": {
"http_filters": [
{
"name": "envoy.router"
}
],
"route_config": {
"name": "exposed_path_filter_health1_21500",
"virtual_hosts": [
{
"domains": [
"*"
],
"name": "exposed_path_filter_health1_21500",
"routes": [
{
"match": {
"path": "/health1"
},
"route": {
"cluster": "local_app"
}
}
]
}
]
},
"stat_prefix": "exposed_path_filter_health1_21500_http",
"tracing": {
"random_sampling": {
}
}
}
}
]
}
]
},
{
"@type": "type.googleapis.com/envoy.api.v2.Listener",
"name": "public_listener:1.2.3.4:8080",
"address": {
"socketAddress": {
"address": "1.2.3.4",
"portValue": 8080
}
},
"filterChains": [
{
"tlsContext": {
"requireClientCertificate": true
},
"filters": [
{
"name": "envoy.ext_authz",
"config": {
"grpc_service": {
"envoy_grpc": {
"cluster_name": "local_agent"
},
"initial_metadata": [
{
"key": "x-consul-token",
"value": "my-token"
}
]
},
"stat_prefix": "connect_authz"
}
},
{
"name": "envoy.tcp_proxy",
"config": {
"cluster": "local_app",
"stat_prefix": "public_listener_tcp"
}
}
]
}
]
}
],
"typeUrl": "type.googleapis.com/envoy.api.v2.Listener",
"nonce": "00000001"
}

View File

@ -103,6 +103,7 @@ type AgentServiceConnectProxyConfig struct {
Config map[string]interface{} `json:",omitempty" bexpr:"-"`
Upstreams []Upstream `json:",omitempty"`
MeshGateway MeshGatewayConfig `json:",omitempty"`
Expose ExposeConfig `json:",omitempty"`
}
// AgentMember represents a cluster member known to the agent

View File

@ -1567,3 +1567,45 @@ func TestAgentService_Register_MeshGateway(t *testing.T) {
require.Contains(t, svc.Proxy.Config, "foo")
require.Equal(t, "bar", svc.Proxy.Config["foo"])
}
func TestAgentService_ExposeChecks(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
path := ExposePath{
LocalPathPort: 8080,
ListenerPort: 21500,
Path: "/metrics",
Protocol: "http2",
}
reg := AgentServiceRegistration{
Kind: ServiceKindConnectProxy,
Name: "expose-proxy",
Address: "10.1.2.3",
Port: 8443,
Proxy: &AgentServiceConnectProxyConfig{
DestinationServiceName: "expose",
Expose: ExposeConfig{
Checks: true,
Paths: []ExposePath{
path,
},
},
},
}
err := agent.ServiceRegister(&reg)
require.NoError(t, err)
svc, _, err := agent.Service("expose-proxy", nil)
require.NoError(t, err)
require.NotNil(t, svc)
require.Equal(t, ServiceKindConnectProxy, svc.Kind)
require.NotNil(t, svc.Proxy)
require.Len(t, svc.Proxy.Expose.Paths, 1)
require.True(t, svc.Proxy.Expose.Checks)
require.Equal(t, path, svc.Proxy.Expose.Paths[0])
}

View File

@ -56,11 +56,41 @@ type MeshGatewayConfig struct {
Mode MeshGatewayMode `json:",omitempty"`
}
// ExposeConfig describes HTTP paths to expose through Envoy outside of Connect.
// Users can expose individual paths and/or all HTTP/GRPC paths for checks.
type ExposeConfig struct {
// Checks defines whether paths associated with Consul checks will be exposed.
// This flag triggers exposing all HTTP and GRPC check paths registered for the service.
Checks bool `json:",omitempty"`
// Paths is the list of paths exposed through the proxy.
Paths []ExposePath `json:",omitempty"`
}
type ExposePath struct {
// ListenerPort defines the port of the proxy's listener for exposed paths.
ListenerPort int `json:",omitempty"`
// Path is the path to expose through the proxy, ie. "/metrics."
Path string `json:",omitempty"`
// LocalPathPort is the port that the service is listening on for the given path.
LocalPathPort int `json:",omitempty"`
// Protocol describes the upstream's service protocol.
// Valid values are "http" and "http2", defaults to "http"
Protocol string `json:",omitempty"`
// ParsedFromCheck is set if this path was parsed from a registered check
ParsedFromCheck bool
}
type ServiceConfigEntry struct {
Kind string
Name string
Protocol string `json:",omitempty"`
MeshGateway MeshGatewayConfig `json:",omitempty"`
Expose ExposeConfig `json:",omitempty"`
ExternalSNI string `json:",omitempty"`
CreateIndex uint64
ModifyIndex uint64
@ -87,6 +117,7 @@ type ProxyConfigEntry struct {
Name string
Config map[string]interface{} `json:",omitempty"`
MeshGateway MeshGatewayConfig `json:",omitempty"`
Expose ExposeConfig `json:",omitempty"`
CreateIndex uint64
ModifyIndex uint64
}

View File

@ -195,6 +195,76 @@ func TestDecodeConfigEntry(t *testing.T) {
expect ConfigEntry
expectErr string
}{
{
name: "expose-paths: kitchen sink proxy",
body: `
{
"Kind": "proxy-defaults",
"Name": "global",
"Expose": {
"Checks": true,
"Paths": [
{
"LocalPathPort": 8080,
"ListenerPort": 21500,
"Path": "/healthz",
"Protocol": "http2"
}
]
}
}
`,
expect: &ProxyConfigEntry{
Kind: "proxy-defaults",
Name: "global",
Expose: ExposeConfig{
Checks: true,
Paths: []ExposePath{
{
LocalPathPort: 8080,
ListenerPort: 21500,
Path: "/healthz",
Protocol: "http2",
},
},
},
},
},
{
name: "expose-paths: kitchen sink service default",
body: `
{
"Kind": "service-defaults",
"Name": "global",
"Expose": {
"Checks": true,
"Paths": [
{
"LocalPathPort": 8080,
"ListenerPort": 21500,
"Path": "/healthz",
"Protocol": "http2"
}
]
}
}
`,
expect: &ServiceConfigEntry{
Kind: "service-defaults",
Name: "global",
Expose: ExposeConfig{
Checks: true,
Paths: []ExposePath{
{
LocalPathPort: 8080,
ListenerPort: 21500,
Path: "/healthz",
Protocol: "http2",
},
},
},
},
},
{
name: "proxy-defaults",
body: `

View File

@ -1042,6 +1042,86 @@ func TestParseConfigEntry(t *testing.T) {
Name: "main",
},
},
{
name: "expose paths: kitchen sink proxy defaults",
snake: `
kind = "proxy-defaults"
name = "global"
expose = {
checks = true
paths = [
{
local_path_port = 8080
listener_port = 21500
path = "/healthz"
protocol = "http2"
}
]
}`,
camel: `
Kind = "proxy-defaults"
Name = "global"
Expose = {
Checks = true
Paths = [
{
LocalPathPort = 8080
ListenerPort = 21500
Path = "/healthz"
Protocol = "http2"
}
]
}`,
snakeJSON: `
{
"kind": "proxy-defaults",
"name": "global",
"expose": {
"checks": true,
"paths": [
{
"local_path_port": 8080,
"listener_port": 21500,
"path": "/healthz",
"protocol": "http2"
}
]
}
}
`,
camelJSON: `
{
"Kind": "proxy-defaults",
"Name": "global",
"Expose": {
"Checks": true,
"Paths": [
{
"LocalPathPort": 8080,
"ListenerPort": 21500,
"Path": "/healthz",
"Protocol": "http2"
}
]
}
}
`,
expect: &api.ProxyConfigEntry{
Kind: "proxy-defaults",
Name: "global",
Expose: api.ExposeConfig{
Checks: true,
Paths: []api.ExposePath{
{
ListenerPort: 21500,
Path: "/healthz",
LocalPathPort: 8080,
Protocol: "http2",
},
},
},
},
},
} {
tc := tc

View File

@ -0,0 +1,4 @@
#!/bin/bash
snapshot_envoy_admin localhost:19000 s1 primary || true
snapshot_envoy_admin localhost:19001 s2 primary || true

View File

@ -54,6 +54,28 @@ config {
for all proxies. Added in v1.6.0.
- `Mode` `(string: "")` - One of `none`, `local`, or `remote`.
- `Expose` `(ExposeConfig: <optional>)` - Controls the default
[expose path configuration](/docs/connect/registration/service-registration.html#expose-paths-configuration-reference)
for Envoy. Added in v1.6.2.
Exposing paths through Envoy enables a service to protect itself by only listening on localhost, while still allowing
non-Connect-enabled applications to contact an HTTP endpoint.
Some examples include: exposing a `/metrics` path for Prometheus or `/healthz` for kubelet liveness checks.
- `Checks` `(bool: false)` - If enabled, all HTTP and gRPC checks registered with the agent are exposed through Envoy.
Envoy will expose listeners for these checks and will only accept connections originating from localhost or Consul's
[advertise address](/docs/agent/options.html#advertise). The port for these listeners are dynamically allocated from
[expose_min_port](/docs/agent/options.html#expose_min_port) to [expose_max_port](/docs/agent/options.html#expose_max_port).
This flag is useful when a Consul client cannot reach registered services over localhost. One example is when running
Consul on Kubernetes, and Consul agents run in their own pods.
- `Paths` `array<Path>: []` - A list of paths to expose through Envoy.
- `Path` `(string: "")` - The HTTP path to expose. The path must be prefixed by a slash. ie: `/metrics`.
- `LocalPathPort` `(int: 0)` - The port where the local service is listening for connections to the path.
- `ListenerPort` `(int: 0)` - The port where the proxy will listen for connections. This port must be available
for the listener to be set up. If the port is not free then Envoy will not expose a listener for the path,
but the proxy registration will not fail.
- `Protocol` `(string: "http")` - Sets the protocol of the listener. One of `http` or `http2`. For gRPC use `http2`.
## ACLs

View File

@ -43,6 +43,28 @@ Protocol = "http"
the TLS [SNI](https://en.wikipedia.org/wiki/Server_Name_Indication) value to
be changed to a non-connect value when federating with an external system.
Added in v1.6.0.
- `Expose` `(ExposeConfig: <optional>)` - Controls the default
[expose path configuration](/docs/connect/registration/service-registration.html#expose-paths-configuration-reference)
for Envoy. Added in v1.6.2.
Exposing paths through Envoy enables a service to protect itself by only listening on localhost, while still allowing
non-Connect-enabled applications to contact an HTTP endpoint.
Some examples include: exposing a `/metrics` path for Prometheus or `/healthz` for kubelet liveness checks.
- `Checks` `(bool: false)` - If enabled, all HTTP and gRPC checks registered with the agent are exposed through Envoy.
Envoy will expose listeners for these checks and will only accept connections originating from localhost or Consul's
[advertise address](/docs/agent/options.html#advertise). The port for these listeners are dynamically allocated from
[expose_min_port](/docs/agent/options.html#expose_min_port) to [expose_max_port](/docs/agent/options.html#expose_max_port).
This flag is useful when a Consul client cannot reach registered services over localhost. One example is when running
Consul on Kubernetes, and Consul agents run in their own pods.
- `Paths` `array<Path>: []` - A list of paths to expose through Envoy.
- `Path` `(string: "")` - The HTTP path to expose. The path must be prefixed by a slash. ie: `/metrics`.
- `LocalPathPort` `(int: 0)` - The port where the local service is listening for connections to the path.
- `ListenerPort` `(int: 0)` - The port where the proxy will listen for connections. This port must be available for
the listener to be set up. If the port is not free then Envoy will not expose a listener for the path,
but the proxy registration will not fail.
- `Protocol` `(string: "http")` - Sets the protocol of the listener. One of `http` or `http2`. For gRPC use `http2`.
## ACLs

View File

@ -1381,6 +1381,16 @@ default will automatically work with some tooling.
number to use for automatically assigned [sidecar service
registrations](/docs/connect/registration/sidecar-service.html). Default 21255.
Set to `0` to disable automatic port assignment.
* <a name="expose_min_port"></a><a
href="#expose_min_port">`expose_min_port`</a> - Inclusive minimum port
number to use for automatically assigned
[exposed check listeners](/docs/connect/registration/service-registration.html#expose-paths-configuration-reference).
Default 21500. Set to `0` to disable automatic port assignment.
* <a name="expose_max_port"></a><a
href="#expose_max_port">`expose_max_port`</a> - Inclusive maximum port
number to use for automatically assigned
[exposed check listeners](/docs/connect/registration/service-registration.html#expose-paths-configuration-reference).
Default 21755. Set to `0` to disable automatic port assignment.
* <a name="protocol"></a><a href="#protocol">`protocol`</a> Equivalent to the
[`-protocol` command-line flag](#_protocol).

View File

@ -66,6 +66,17 @@ example shows all possible fields, but note that only a few are required.
"upstreams": [],
"mesh_gateway": {
"mode": "local"
},
"expose": {
"checks": true,
"paths": [
{
"path": "/healthz",
"local_path_port": 8080,
"listener_port": 21500,
"protocol": "http2"
}
]
}
},
"connect": {
@ -140,6 +151,30 @@ For Consul 0.9.3 and earlier you need to use `enableTagOverride`. Consul 1.0
supports both `enable_tag_override` and `enableTagOverride` but the latter is
deprecated and has been removed as of Consul 1.1.
### Checks
A service can have an associated health check. This is a powerful feature as
it allows a web balancer to gracefully remove failing nodes, a database
to replace a failed secondary, etc. The health check is strongly integrated in
the DNS interface as well. If a service is failing its health check or a
node has any failing system-level check, the DNS interface will omit that
node from any service query.
There are several check types that have differing required options as
[documented here](/docs/agent/checks.html). The check name is automatically
generated as `service:<service-id>`. If there are multiple service checks
registered, the ID will be generated as `service:<service-id>:<num>` where
`<num>` is an incrementing number starting from `1`.
-> **Note:** There is more information about [checks here](/docs/agent/checks.html).
### Proxy
Service definitions allow for an optional proxy registration. Proxies used with Connect
are registered as services in Consul's catalog.
See the [Proxy Service Registration](/docs/connect/registration/service-registration.html) reference
for the available configuration options.
### Connect
The `kind` field is used to optionally identify the service as a [Connect
@ -170,23 +205,6 @@ supported "Managed" proxies which are specified with the `connect.proxy` field.
[Managed Proxies are deprecated](/docs/connect/proxies/managed-deprecated.html)
and the `connect.proxy` field will be removed in a future major release.
### Checks
A service can have an associated health check. This is a powerful feature as
it allows a web balancer to gracefully remove failing nodes, a database
to replace a failed secondary, etc. The health check is strongly integrated in
the DNS interface as well. If a service is failing its health check or a
node has any failing system-level check, the DNS interface will omit that
node from any service query.
There are several check types that have differing required options as
[documented here](/docs/agent/checks.html). The check name is automatically
generated as `service:<service-id>`. If there are multiple service checks
registered, the ID will be generated as `service:<service-id>:<num>` where
`<num>` is an incrementing number starting from `1`.
-> **Note:** There is more information about [checks here](/docs/agent/checks.html).
### DNS SRV Weights
The `weights` field is an optional field to specify the weight of a service in

View File

@ -71,6 +71,7 @@ The dynamic configuration Consul Connect provides to each Envoy instance include
- Service-discovery results for upstreams to enable each sidecar proxy to load-balance
outgoing connections.
- L7 configuration including timeouts and protocol-specific options.
- Configuration to [expose specific HTTP paths](/docs/connect/registration/service-registration.html#expose-paths-configuration-reference).
For more information on the parts of the Envoy proxy runtime configuration
that are currently controllable via Consul Connect see [Dynamic
@ -155,7 +156,7 @@ each service such as which protocol they speak. Consul will use this information
to configure appropriate proxy settings for that service's proxies and also for
the upstream listeners of any downstream service.
Users can define a service's protocol in its [`service-defaults` configuration
One example is how users can define a service's protocol in a [`service-defaults` configuration
entry](/docs/agent/config-entries/service-defaults.html). Agents with
[`enable_central_service_config`](/docs/agent/options.html#enable_central_service_config)
set to true will automatically discover the protocol when configuring a proxy
@ -169,6 +170,9 @@ and `proxy.upstreams[*].config` fields of the [proxy service
definition](/docs/connect/registration/service-registration.html) that is
actually registered.
To learn about other options that can be configured centrally see the
[Configuration Entries](/docs/agent/config_entries.html) docs.
### Proxy Config Options
These fields may also be overridden explicitly in the [proxy service

View File

@ -82,7 +82,8 @@ registering a proxy instance.
"local_service_port": 9090,
"config": {},
"upstreams": [],
"mesh_gateway": {}
"mesh_gateway": {},
"expose": {}
},
"port": 8181
}
@ -122,12 +123,16 @@ registering a proxy instance.
- `mesh_gateway` `(object: {})` - Specifies the mesh gateway configuration
for this proxy. The format is defined in the [Mesh Gateway Configuration Reference](#mesh-gateway-configuration-reference).
- `expose` `(object: {})` - Specifies the configuration to expose HTTP paths through this proxy.
The format is defined in the [Expose Paths Configuration Reference](#expose-paths-configuration-reference),
and is only compatible with an Envoy proxy.
### Upstream Configuration Reference
The following examples show all possible upstream configuration parameters.
Note that `snake_case` is used here as it works in both [config file and API
-> Note that `snake_case` is used here as it works in both [config file and API
registrations](/docs/agent/services.html#service-definition-parameter-case).
Upstreams support multiple destination types. Both examples are shown below
@ -185,13 +190,16 @@ followed by documentation for each attribute.
reference](/docs/connect/configuration.html#envoy-options)
* `mesh_gateway` `(object: {})` - Specifies the mesh gateway configuration
for this proxy. The format is defined in the [Mesh Gateway Configuration Reference](#mesh-gateway-configuration-reference).
* `expose` `(object: {})` - Specifies the configuration to expose HTTP paths through this proxy.
The format is defined in the [Expose Paths Configuration Reference](#expose-paths-configuration-reference),
and is only compatible with an Envoy proxy.
### Mesh Gateway Configuration Reference
The following examples show all possible mesh gateway configurations.
Note that `snake_case` is used here as it works in both [config file and API
-> Note that `snake_case` is used here as it works in both [config file and API
registrations](/docs/agent/services.html#service-definition-parameter-case).
#### Using a Local/Egress Gateway in the Local Datacenter
@ -237,3 +245,71 @@ registrations](/docs/agent/services.html#service-definition-parameter-case).
2. Proxy Service's `Proxy` configuration
3. The `service-defaults` configuration for the service.
4. The `global` `proxy-defaults`.
### Expose Paths Configuration Reference
The following examples show possible configurations to expose HTTP paths through Envoy.
Exposing paths through Envoy enables a service to protect itself by only listening on localhost, while still allowing
non-Connect-enabled applications to contact an HTTP endpoint.
Some examples include: exposing a `/metrics` path for Prometheus or `/healthz` for kubelet liveness checks.
-> Note that `snake_case` is used here as it works in both [config file and API
registrations](/docs/agent/services.html#service-definition-parameter-case).
#### Expose listeners in Envoy for HTTP and GRPC checks registered with the local Consul agent
```json
{
"expose": {
"checks": true
}
}
```
#### Expose an HTTP listener in Envoy at port 2150 that routes to an HTTP server listening at port 8080
```json
{
"expose": {
"paths": [
{
"path": "/healthz",
"local_path_port": 8080,
"listener_port": 21500
}
]
}
}
```
#### Expose an HTTP2 listener in Envoy at port 21501 that routes to a gRPC server listening at port 9090
```json
{
"expose": {
"paths": [
{
"path": "/grpc.health.v1.Health/Check",
"protocol": "http2",
"local_path_port": 9090,
"listener_port": 21500
}
]
}
}
```
* `checks` `(bool: false)` - If enabled, all HTTP and gRPC checks registered with the agent are exposed through Envoy.
Envoy will expose listeners for these checks and will only accept connections originating from localhost or Consul's
[advertise address](/docs/agent/options.html#advertise). The port for these listeners are dynamically allocated from
[expose_min_port](/docs/agent/options.html#expose_min_port) to [expose_max_port](/docs/agent/options.html#expose_max_port).
This flag is useful when a Consul client cannot reach registered services over localhost. One example is when running
Consul on Kubernetes, and Consul agents run in their own pods.
* `paths` `array<Path>: []` - A list of paths to expose through Envoy.
- `path` `(string: "")` - The HTTP path to expose. The path must be prefixed by a slash. ie: `/metrics`.
- `local_path_port` `(int: 0)` - The port where the local service is listening for connections to the path.
- `listener_port` `(int: 0)` - The port where the proxy will listen for connections. This port must be available for
the listener to be set up. If the port is not free then Envoy will not expose a listener for the path,
but the proxy registration will not fail.
- `protocol` `(string: "http")` - Sets the protocol of the listener. One of `http` or `http2`. For gRPC use `http2`.