Connect Envoy Command (#4735)

* Plumb xDS server and proxyxfg into the agent startup

* Add `consul connect envoy` command to allow running Envoy as a connect sidecar.

* Add test for help tabs; typos and style fixups from review
pull/4776/head
Paul Banks 2018-10-03 20:37:53 +01:00
parent 1909a95118
commit dca1303d05
17 changed files with 909 additions and 184 deletions

View File

@ -18,6 +18,8 @@ import (
"sync"
"time"
"google.golang.org/grpc"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/ae"
@ -27,10 +29,12 @@ import (
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/proxyprocess"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/systemd"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/agent/xds"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
@ -218,8 +222,23 @@ type Agent struct {
// proxyManager is the proxy process manager for managed Connect proxies.
proxyManager *proxyprocess.Manager
// proxyLock protects proxy information in the local state from concurrent modification
// proxyLock protects _managed_ proxy information in the local state from
// concurrent modification. It is not needed to work with proxyConfig state.
proxyLock sync.Mutex
// proxyConfig is the manager for proxy service (Kind = connect-proxy)
// configuration state. This ensures all state needed by a proxy registration
// is maintained in cache and handles pushing updates to that state into XDS
// server to be pushed out to Envoy. This is NOT related to managed proxies
// directly.
proxyConfig *proxycfg.Manager
// xdsServer is the Server instance that serves xDS gRPC API.
xdsServer *xds.Server
// grpcServer is the server instance used currently to serve xDS API for
// Envoy.
grpcServer *grpc.Server
}
func New(c *config.RuntimeConfig) (*Agent, error) {
@ -409,6 +428,21 @@ func (a *Agent) Start() error {
}
}
// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
Cache: a.cache,
Logger: a.logger,
State: a.State,
Source: &structs.QuerySource{
Node: a.config.NodeName,
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
},
})
if err != nil {
return err
}
// Start watching for critical services to deregister, based on their
// checks.
go a.reapServices()
@ -446,6 +480,11 @@ func (a *Agent) Start() error {
a.httpServers = append(a.httpServers, srv)
}
// Start gRPC server.
if err := a.listenAndServeGRPC(); err != nil {
return err
}
// register watches
if err := a.reloadWatches(a.config); err != nil {
return err
@ -458,6 +497,43 @@ func (a *Agent) Start() error {
return nil
}
func (a *Agent) listenAndServeGRPC() error {
if len(a.config.GRPCAddrs) < 1 {
return nil
}
a.xdsServer = &xds.Server{
Logger: a.logger,
CfgMgr: a.proxyConfig,
Authz: a,
ResolveToken: func(id string) (acl.ACL, error) {
return a.resolveToken(id)
},
}
var err error
a.grpcServer, err = a.xdsServer.GRPCServer(a.config.CertFile, a.config.KeyFile)
if err != nil {
return err
}
ln, err := a.startListeners(a.config.GRPCAddrs)
if err != nil {
return err
}
for _, l := range ln {
go func(innerL net.Listener) {
a.logger.Printf("[INFO] agent: Started gRPC server on %s (%s)",
innerL.Addr().String(), innerL.Addr().Network())
err := a.grpcServer.Serve(innerL)
if err != nil {
a.logger.Printf("[ERR] gRPC server failed: %s", err)
}
}(l)
}
return nil
}
func (a *Agent) listenAndServeDNS() error {
notif := make(chan net.Addr, len(a.config.DNSAddrs))
errCh := make(chan error, len(a.config.DNSAddrs))
@ -497,6 +573,34 @@ func (a *Agent) listenAndServeDNS() error {
return merr.ErrorOrNil()
}
func (a *Agent) startListeners(addrs []net.Addr) ([]net.Listener, error) {
var ln []net.Listener
for _, addr := range addrs {
var l net.Listener
var err error
switch x := addr.(type) {
case *net.UnixAddr:
l, err = a.listenSocket(x.Name)
if err != nil {
return nil, err
}
case *net.TCPAddr:
l, err = net.Listen("tcp", x.String())
if err != nil {
return nil, err
}
l = &tcpKeepAliveListener{l.(*net.TCPListener)}
default:
return nil, fmt.Errorf("unsupported address type %T", addr)
}
ln = append(ln, l)
}
return ln, nil
}
// listenHTTP binds listeners to the provided addresses and also returns
// pre-configured HTTP servers which are not yet started. The motivation is
// that in the current startup/shutdown setup we de-couple the listener
@ -516,38 +620,21 @@ func (a *Agent) listenHTTP() ([]*HTTPServer, error) {
var ln []net.Listener
var servers []*HTTPServer
start := func(proto string, addrs []net.Addr) error {
for _, addr := range addrs {
var l net.Listener
listeners, err := a.startListeners(addrs)
if err != nil {
return err
}
for _, l := range listeners {
var tlscfg *tls.Config
var err error
switch x := addr.(type) {
case *net.UnixAddr:
l, err = a.listenSocket(x.Name)
_, isTCP := l.(*tcpKeepAliveListener)
if isTCP && proto == "https" {
tlscfg, err = a.config.IncomingHTTPSConfig()
if err != nil {
return err
}
case *net.TCPAddr:
l, err = net.Listen("tcp", x.String())
if err != nil {
return err
}
l = &tcpKeepAliveListener{l.(*net.TCPListener)}
if proto == "https" {
tlscfg, err = a.config.IncomingHTTPSConfig()
if err != nil {
return err
}
l = tls.NewListener(l, tlscfg)
}
default:
return fmt.Errorf("unsupported address type %T", addr)
l = tls.NewListener(l, tlscfg)
}
ln = append(ln, l)
srv := &HTTPServer{
Server: &http.Server{
Addr: l.Addr().String(),
@ -569,6 +656,7 @@ func (a *Agent) listenHTTP() ([]*HTTPServer, error) {
}
}
ln = append(ln, l)
servers = append(servers, srv)
}
return nil
@ -1341,7 +1429,17 @@ func (a *Agent) ShutdownAgent() error {
chk.Stop()
}
// Stop the proxy manager
// Stop gRPC
if a.grpcServer != nil {
a.grpcServer.Stop()
}
// Stop the proxy config manager
if a.proxyConfig != nil {
a.proxyConfig.Close()
}
// Stop the proxy process manager
if a.proxyManager != nil {
// If persistence is disabled (implies DevMode but a subset of DevMode) then
// don't leave the proxies running since the agent will not be able to

View File

@ -6,7 +6,6 @@ import (
"log"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
@ -20,7 +19,6 @@ import (
"github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/ipaddr"
@ -1432,132 +1430,14 @@ func (s *HTTPServer) AgentConnectAuthorize(resp http.ResponseWriter, req *http.R
// Decode the request from the request body
var authReq structs.ConnectAuthorizeRequest
if err := decodeBody(req, &authReq, nil); err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Request decode failed: %v", err)
return nil, nil
return nil, BadRequestError{fmt.Sprintf("Request decode failed: %v", err)}
}
// We need to have a target to check intentions
if authReq.Target == "" {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Target service must be specified")
return nil, nil
}
// Parse the certificate URI from the client ID
uriRaw, err := url.Parse(authReq.ClientCertURI)
if err != nil {
return &connectAuthorizeResp{
Authorized: false,
Reason: fmt.Sprintf("Client ID must be a URI: %s", err),
}, nil
}
uri, err := connect.ParseCertURI(uriRaw)
if err != nil {
return &connectAuthorizeResp{
Authorized: false,
Reason: fmt.Sprintf("Invalid client ID: %s", err),
}, nil
}
uriService, ok := uri.(*connect.SpiffeIDService)
if !ok {
return &connectAuthorizeResp{
Authorized: false,
Reason: "Client ID must be a valid SPIFFE service URI",
}, nil
}
// We need to verify service:write permissions for the given token.
// We do this manually here since the RPC request below only verifies
// service:read.
rule, err := s.agent.resolveToken(token)
authz, reason, cacheMeta, err := s.agent.ConnectAuthorize(token, &authReq)
if err != nil {
return nil, err
}
if rule != nil && !rule.ServiceWrite(authReq.Target, nil) {
return nil, acl.ErrPermissionDenied
}
// Validate the trust domain matches ours. Later we will support explicit
// external federation but not built yet.
rootArgs := &structs.DCSpecificRequest{Datacenter: s.agent.config.Datacenter}
raw, _, err := s.agent.cache.Get(cachetype.ConnectCARootName, rootArgs)
if err != nil {
return nil, err
}
roots, ok := raw.(*structs.IndexedCARoots)
if !ok {
return nil, fmt.Errorf("internal error: roots response type not correct")
}
if roots.TrustDomain == "" {
return nil, fmt.Errorf("connect CA not bootstrapped yet")
}
if roots.TrustDomain != strings.ToLower(uriService.Host) {
return &connectAuthorizeResp{
Authorized: false,
Reason: fmt.Sprintf("Identity from an external trust domain: %s",
uriService.Host),
}, nil
}
// TODO(banks): Implement revocation list checking here.
// Get the intentions for this target service.
args := &structs.IntentionQueryRequest{
Datacenter: s.agent.config.Datacenter,
Match: &structs.IntentionQueryMatch{
Type: structs.IntentionMatchDestination,
Entries: []structs.IntentionMatchEntry{
{
Namespace: structs.IntentionDefaultNamespace,
Name: authReq.Target,
},
},
},
}
args.Token = token
raw, m, err := s.agent.cache.Get(cachetype.IntentionMatchName, args)
if err != nil {
return nil, err
}
setCacheMeta(resp, &m)
reply, ok := raw.(*structs.IndexedIntentionMatches)
if !ok {
return nil, fmt.Errorf("internal error: response type not correct")
}
if len(reply.Matches) != 1 {
return nil, fmt.Errorf("Internal error loading matches")
}
// Test the authorization for each match
for _, ixn := range reply.Matches[0] {
if auth, ok := uriService.Authorize(ixn); ok {
return &connectAuthorizeResp{
Authorized: auth,
Reason: fmt.Sprintf("Matched intention: %s", ixn.String()),
}, nil
}
}
// No match, we need to determine the default behavior. We do this by
// specifying the anonymous token token, which will get that behavior.
// The default behavior if ACLs are disabled is to allow connections
// to mimic the behavior of Consul itself: everything is allowed if
// ACLs are disabled.
rule, err = s.agent.resolveToken("")
if err != nil {
return nil, err
}
authz := true
reason := "ACLs disabled, access is allowed by default"
if rule != nil {
authz = rule.IntentionDefaultAllow()
reason = "Default behavior configured by ACLs"
}
setCacheMeta(resp, cacheMeta)
return &connectAuthorizeResp{
Authorized: authz,

View File

@ -4938,6 +4938,7 @@ func TestAgentConnectAuthorize_badBody(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
@ -4945,16 +4946,19 @@ func TestAgentConnectAuthorize_badBody(t *testing.T) {
args := []string{}
req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args))
resp := httptest.NewRecorder()
_, err := a.srv.AgentConnectAuthorize(resp, req)
assert.Nil(err)
assert.Equal(400, resp.Code)
assert.Contains(resp.Body.String(), "decode")
respRaw, err := a.srv.AgentConnectAuthorize(resp, req)
require.Error(err)
assert.Nil(respRaw)
// Note that BadRequestError is handled outside the endpoint handler so we
// still see a 200 if we check here.
assert.Contains(err.Error(), "decode failed")
}
func TestAgentConnectAuthorize_noTarget(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
@ -4962,10 +4966,12 @@ func TestAgentConnectAuthorize_noTarget(t *testing.T) {
args := &structs.ConnectAuthorizeRequest{}
req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args))
resp := httptest.NewRecorder()
_, err := a.srv.AgentConnectAuthorize(resp, req)
assert.Nil(err)
assert.Equal(400, resp.Code)
assert.Contains(resp.Body.String(), "Target service")
respRaw, err := a.srv.AgentConnectAuthorize(resp, req)
require.Error(err)
assert.Nil(respRaw)
// Note that BadRequestError is handled outside the endpoint handler so we
// still see a 200 if we check here.
assert.Contains(err.Error(), "Target service must be specified")
}
// Client ID is not in the valid URI format
@ -4973,6 +4979,7 @@ func TestAgentConnectAuthorize_idInvalidFormat(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
@ -4984,12 +4991,11 @@ func TestAgentConnectAuthorize_idInvalidFormat(t *testing.T) {
req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args))
resp := httptest.NewRecorder()
respRaw, err := a.srv.AgentConnectAuthorize(resp, req)
assert.Nil(err)
assert.Equal(200, resp.Code)
obj := respRaw.(*connectAuthorizeResp)
assert.False(obj.Authorized)
assert.Contains(obj.Reason, "Invalid client")
require.Error(err)
assert.Nil(respRaw)
// Note that BadRequestError is handled outside the endpoint handler so we
// still see a 200 if we check here.
assert.Contains(err.Error(), "ClientCertURI not a valid Connect identifier")
}
// Client ID is a valid URI but its not a service URI
@ -4997,6 +5003,7 @@ func TestAgentConnectAuthorize_idNotService(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
@ -5008,12 +5015,11 @@ func TestAgentConnectAuthorize_idNotService(t *testing.T) {
req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args))
resp := httptest.NewRecorder()
respRaw, err := a.srv.AgentConnectAuthorize(resp, req)
assert.Nil(err)
assert.Equal(200, resp.Code)
obj := respRaw.(*connectAuthorizeResp)
assert.False(obj.Authorized)
assert.Contains(obj.Reason, "must be a valid")
require.Error(err)
assert.Nil(respRaw)
// Note that BadRequestError is handled outside the endpoint handler so we
// still see a 200 if we check here.
assert.Contains(err.Error(), "ClientCertURI not a valid Service identifier")
}
// Test when there is an intention allowing the connection
@ -5162,6 +5168,7 @@ func TestAgentConnectAuthorize_denyTrustDomain(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
@ -5182,7 +5189,7 @@ func TestAgentConnectAuthorize_denyTrustDomain(t *testing.T) {
req.Intention.Action = structs.IntentionActionAllow
var reply string
assert.Nil(a.RPC("Intention.Apply", &req, &reply))
require.NoError(a.RPC("Intention.Apply", &req, &reply))
}
{
@ -5193,7 +5200,7 @@ func TestAgentConnectAuthorize_denyTrustDomain(t *testing.T) {
req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args))
resp := httptest.NewRecorder()
respRaw, err := a.srv.AgentConnectAuthorize(resp, req)
assert.Nil(err)
require.NoError(err)
assert.Equal(200, resp.Code)
obj := respRaw.(*connectAuthorizeResp)
@ -5206,6 +5213,7 @@ func TestAgentConnectAuthorize_denyWildcard(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -5227,7 +5235,7 @@ func TestAgentConnectAuthorize_denyWildcard(t *testing.T) {
req.Intention.Action = structs.IntentionActionDeny
var reply string
assert.Nil(a.RPC("Intention.Apply", &req, &reply))
require.NoError(a.RPC("Intention.Apply", &req, &reply))
}
{
// Allow web to DB
@ -5255,7 +5263,7 @@ func TestAgentConnectAuthorize_denyWildcard(t *testing.T) {
req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args))
resp := httptest.NewRecorder()
respRaw, err := a.srv.AgentConnectAuthorize(resp, req)
assert.Nil(err)
require.NoError(err)
assert.Equal(200, resp.Code)
obj := respRaw.(*connectAuthorizeResp)
@ -5272,7 +5280,7 @@ func TestAgentConnectAuthorize_denyWildcard(t *testing.T) {
req, _ := http.NewRequest("POST", "/v1/agent/connect/authorize", jsonReader(args))
resp := httptest.NewRecorder()
respRaw, err := a.srv.AgentConnectAuthorize(resp, req)
assert.Nil(err)
require.NoError(err)
assert.Equal(200, resp.Code)
obj := respRaw.(*connectAuthorizeResp)

138
agent/connect_auth.go Normal file
View File

@ -0,0 +1,138 @@
package agent
import (
"fmt"
"strings"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
)
// ConnectAuthorize implements the core authorization logic for Connect. It's in
// a separate agent method here because we need to re-use this both in our own
// HTTP API authz endpoint and in the gRPX xDS/ext_authz API for envoy.
//
// The ACL token and the auth request are provided and the auth decision (true
// means authorised) and reason string are returned.
//
// If the request input is invalid the error returned will be a BadRequestError,
// if the token doesn't grant necessary access then an acl.ErrPermissionDenied
// error is returned, otherwise error indicates an unexpected server failure. If
// access is denied, no error is returned but the first return value is false.
func (a *Agent) ConnectAuthorize(token string,
req *structs.ConnectAuthorizeRequest) (authz bool, reason string, m *cache.ResultMeta, err error) {
// Helper to make the error cases read better without resorting to named
// returns which get messy and prone to mistakes in a method this long.
returnErr := func(err error) (bool, string, *cache.ResultMeta, error) {
return false, "", nil, err
}
if req == nil {
return returnErr(BadRequestError{"Invalid request"})
}
// We need to have a target to check intentions
if req.Target == "" {
return returnErr(BadRequestError{"Target service must be specified"})
}
// Parse the certificate URI from the client ID
uri, err := connect.ParseCertURIFromString(req.ClientCertURI)
if err != nil {
return returnErr(BadRequestError{"ClientCertURI not a valid Connect identifier"})
}
uriService, ok := uri.(*connect.SpiffeIDService)
if !ok {
return returnErr(BadRequestError{"ClientCertURI not a valid Service identifier"})
}
// We need to verify service:write permissions for the given token.
// We do this manually here since the RPC request below only verifies
// service:read.
rule, err := a.resolveToken(token)
if err != nil {
return returnErr(err)
}
if rule != nil && !rule.ServiceWrite(req.Target, nil) {
return returnErr(acl.ErrPermissionDenied)
}
// Validate the trust domain matches ours. Later we will support explicit
// external federation but not built yet.
rootArgs := &structs.DCSpecificRequest{Datacenter: a.config.Datacenter}
raw, _, err := a.cache.Get(cachetype.ConnectCARootName, rootArgs)
if err != nil {
return returnErr(err)
}
roots, ok := raw.(*structs.IndexedCARoots)
if !ok {
return returnErr(fmt.Errorf("internal error: roots response type not correct"))
}
if roots.TrustDomain == "" {
return returnErr(fmt.Errorf("Connect CA not bootstrapped yet"))
}
if roots.TrustDomain != strings.ToLower(uriService.Host) {
reason = fmt.Sprintf("Identity from an external trust domain: %s",
uriService.Host)
return false, reason, nil, nil
}
// TODO(banks): Implement revocation list checking here.
// Get the intentions for this target service.
args := &structs.IntentionQueryRequest{
Datacenter: a.config.Datacenter,
Match: &structs.IntentionQueryMatch{
Type: structs.IntentionMatchDestination,
Entries: []structs.IntentionMatchEntry{
{
Namespace: structs.IntentionDefaultNamespace,
Name: req.Target,
},
},
},
QueryOptions: structs.QueryOptions{Token: token},
}
raw, meta, err := a.cache.Get(cachetype.IntentionMatchName, args)
if err != nil {
return returnErr(err)
}
reply, ok := raw.(*structs.IndexedIntentionMatches)
if !ok {
return returnErr(fmt.Errorf("internal error: response type not correct"))
}
if len(reply.Matches) != 1 {
return returnErr(fmt.Errorf("Internal error loading matches"))
}
// Test the authorization for each match
for _, ixn := range reply.Matches[0] {
if auth, ok := uriService.Authorize(ixn); ok {
reason = fmt.Sprintf("Matched intention: %s", ixn.String())
return auth, reason, &meta, nil
}
}
// No match, we need to determine the default behavior. We do this by
// specifying the anonymous token, which will get the default behavior. The
// default behavior if ACLs are disabled is to allow connections to mimic the
// behavior of Consul itself: everything is allowed if ACLs are disabled.
rule, err = a.resolveToken("")
if err != nil {
return returnErr(err)
}
if rule == nil {
// ACLs not enabled at all, the default is allow all.
return true, "ACLs disabled, access is allowed by default", &meta, nil
}
reason = "Default behavior configured by ACLs"
return rule.IntentionDefaultAllow(), reason, &meta, nil
}

View File

@ -468,8 +468,11 @@ func setMeta(resp http.ResponseWriter, m *structs.QueryMeta) {
// setCacheMeta sets http response headers to indicate cache status.
func setCacheMeta(resp http.ResponseWriter, m *cache.ResultMeta) {
if m == nil {
return
}
str := "MISS"
if m != nil && m.Hit {
if m.Hit {
str = "HIT"
}
resp.Header().Set("X-Cache", str)

View File

@ -76,7 +76,7 @@ type ConnectAuthz interface {
// easier testing without several layers of mocked cache, local state and
// proxycfg.Manager.
type ConfigManager interface {
Watch(proxyID string) (<-chan *proxycfg.ConfigSnapshot, func())
Watch(proxyID string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc)
}
// Server represents a gRPC server that can handle both XDS and ext_authz

View File

@ -70,7 +70,7 @@ func (m *testManager) DeliverConfig(t *testing.T, proxyID string, cfg *proxycfg.
}
// Watch implements ConfigManager
func (m *testManager) Watch(proxyID string) (<-chan *proxycfg.ConfigSnapshot, func()) {
func (m *testManager) Watch(proxyID string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc) {
m.Lock()
defer m.Unlock()
// ch might be nil but then it will just block forever

View File

@ -61,6 +61,12 @@ const (
// HTTPSSLVerifyEnvName defines an environment variable name which sets
// whether or not to disable certificate checking.
HTTPSSLVerifyEnvName = "CONSUL_HTTP_SSL_VERIFY"
// GRPCAddrEnvName defines an environment variable name which sets the gRPC
// address for consul connect envoy. Note this isn't actually used by the api
// client in this package but is defined here for consistency with all the
// other ENV names we use.
GRPCAddrEnvName = "CONSUL_GRPC_ADDR"
)
// QueryOptions are used to parameterize a query

View File

@ -22,6 +22,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/logutils"
"github.com/mitchellh/cli"
"google.golang.org/grpc/grpclog"
)
func New(ui cli.Ui, revision, version, versionPre, versionHuman string, shutdownCh <-chan struct{}) *cmd {
@ -202,6 +203,9 @@ func (c *cmd) run(args []string) int {
c.logOutput = logOutput
c.logger = log.New(logOutput, "", log.LstdFlags)
// Setup gRPC logger to use the same output/filtering
grpclog.SetLoggerV2(logger.NewGRPCLogger(logConfig, c.logger))
memSink, err := lib.InitTelemetry(config.Telemetry)
if err != nil {
c.UI.Error(err.Error())

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/consul/command/connect/ca"
caget "github.com/hashicorp/consul/command/connect/ca/get"
caset "github.com/hashicorp/consul/command/connect/ca/set"
"github.com/hashicorp/consul/command/connect/envoy"
"github.com/hashicorp/consul/command/connect/proxy"
"github.com/hashicorp/consul/command/event"
"github.com/hashicorp/consul/command/exec"
@ -77,6 +78,7 @@ func init() {
Register("connect ca get-config", func(ui cli.Ui) (cli.Command, error) { return caget.New(ui), nil })
Register("connect ca set-config", func(ui cli.Ui) (cli.Command, error) { return caset.New(ui), nil })
Register("connect proxy", func(ui cli.Ui) (cli.Command, error) { return proxy.New(ui, MakeShutdownCh()), nil })
Register("connect envoy", func(ui cli.Ui) (cli.Command, error) { return envoy.New(ui), nil })
Register("event", func(ui cli.Ui) (cli.Command, error) { return event.New(ui), nil })
Register("exec", func(ui cli.Ui) (cli.Command, error) { return exec.New(ui, MakeShutdownCh()), nil })
Register("force-leave", func(ui cli.Ui) (cli.Command, error) { return forceleave.New(ui), nil })

View File

@ -0,0 +1,55 @@
package envoy
type templateArgs struct {
ProxyCluster, ProxyID string
AgentHTTPAddress string
AgentHTTPPort string
AgentTLS bool
AgentCAFile string
AdminBindAddress string
AdminBindPort string
LocalAgentClusterName string
Token string
}
const bootstrapTemplate = `
# Bootstrap Config for Consul Connect
# Generated by consul connect envoy
admin:
access_log_path: /dev/null
address:
socket_address:
address: "{{ .AdminBindAddress }}"
port_value: {{ .AdminBindPort }}
node:
cluster: "{{ .ProxyCluster }}"
id: "{{ .ProxyID }}"
static_resources:
clusters:
- name: "{{ .LocalAgentClusterName }}"
connect_timeout: 1s
type: STATIC
{{ if .AgentTLS -}}
tls_context:
common_tls_context:
validation_context:
trusted_ca:
filename: {{ .AgentCAFile }}
{{- end }}
http2_protocol_options: {}
hosts:
- socket_address:
address: "{{ .AgentHTTPAddress }}"
port_value: {{ .AgentHTTPPort }}
dynamic_resources:
lds_config: {ads: {}}
cds_config: {ads: {}}
ads_config:
api_type: GRPC
grpc_services:
initial_metadata:
- key: x-consul-token
value: "{{ .Token }}"
envoy_grpc:
cluster_name: "{{ .LocalAgentClusterName }}"
`

View File

@ -0,0 +1,301 @@
package envoy
import (
"bytes"
"flag"
"fmt"
"html/template"
"net"
"os"
"os/exec"
"strconv"
"strings"
"syscall"
proxyAgent "github.com/hashicorp/consul/agent/proxyprocess"
"github.com/hashicorp/consul/agent/xds"
"github.com/hashicorp/consul/api"
proxyCmd "github.com/hashicorp/consul/command/connect/proxy"
"github.com/hashicorp/consul/command/flags"
"github.com/mitchellh/cli"
)
func New(ui cli.Ui) *cmd {
ui = &cli.PrefixedUi{
OutputPrefix: "==> ",
InfoPrefix: " ",
ErrorPrefix: "==> ",
Ui: ui,
}
c := &cmd{UI: ui}
c.init()
return c
}
type cmd struct {
UI cli.Ui
flags *flag.FlagSet
http *flags.HTTPFlags
help string
client *api.Client
// flags
proxyID string
sidecarFor string
adminBind string
envoyBin string
bootstrap bool
grpcAddr string
}
func (c *cmd) init() {
c.flags = flag.NewFlagSet("", flag.ContinueOnError)
c.flags.StringVar(&c.proxyID, "proxy-id", "",
"The proxy's ID on the local agent.")
c.flags.StringVar(&c.sidecarFor, "sidecar-for", "",
"The ID of a service instance on the local agent that this proxy should "+
"become a sidecar for. It requires that the proxy service is registered "+
"with the agent as a connect-proxy with Proxy.DestinationServiceID set "+
"to this value. If more than one such proxy is registered it will fail.")
c.flags.StringVar(&c.envoyBin, "envoy-binary", "",
"The full path to the envoy binary to run. By default will just search "+
"$PATH. Ignored if -bootstrap is used.")
c.flags.StringVar(&c.adminBind, "admin-bind", "localhost:19000",
"The address:port to start envoy's admin server on. Envoy requires this "+
"but care must be taked to ensure it's not exposed to untrusted network "+
"as it has full control over the secrets and config of the proxy.")
c.flags.BoolVar(&c.bootstrap, "bootstrap", false,
"Generate the bootstrap.yaml but don't exec envoy")
c.flags.StringVar(&c.grpcAddr, "grpc-addr", "",
"Set the agent's gRPC address and port (in http(s)://host:port format). "+
"Alternatively, you can specify CONSUL_GRPC_ADDR in ENV.")
c.http = &flags.HTTPFlags{}
flags.Merge(c.flags, c.http.ClientFlags())
c.help = flags.Usage(help, c.flags)
}
func (c *cmd) Run(args []string) int {
if err := c.flags.Parse(args); err != nil {
return 1
}
passThroughArgs := c.flags.Args()
// Load the proxy ID and token from env vars if they're set
if c.proxyID == "" {
c.proxyID = os.Getenv(proxyAgent.EnvProxyID)
}
if c.sidecarFor == "" {
c.sidecarFor = os.Getenv(proxyAgent.EnvSidecarFor)
}
if c.grpcAddr == "" {
c.grpcAddr = os.Getenv(api.GRPCAddrEnvName)
}
if c.grpcAddr == "" {
c.UI.Error("Either -grpc-addr or CONSUL_GRPC_ADDR must be specified")
return 1
}
if c.http.Token() == "" {
// Extra check needed since CONSUL_HTTP_TOKEN has not been consulted yet but
// calling SetToken with empty will force that to override the
if proxyToken := os.Getenv(proxyAgent.EnvProxyToken); proxyToken != "" {
c.http.SetToken(proxyToken)
}
}
// Setup Consul client
client, err := c.http.APIClient()
if err != nil {
c.UI.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
return 1
}
c.client = client
// See if we need to lookup proxyID
if c.proxyID == "" && c.sidecarFor != "" {
proxyID, err := c.lookupProxyIDForSidecar()
if err != nil {
c.UI.Error(err.Error())
return 1
}
c.proxyID = proxyID
}
if c.proxyID == "" {
c.UI.Error("No proxy ID specified. One of -proxy-id or -sidecar-for is " +
"required")
return 1
}
// Generate config
bootstrapYaml, err := c.generateConfig()
if err != nil {
c.UI.Error(err.Error())
return 1
}
if c.bootstrap {
// Just output it and we are done
fmt.Println(bootstrapYaml)
return 0
}
// Find Envoy binary
binary, err := c.findBinary()
if err != nil {
c.UI.Error("Couldn't find envoy binary: " + err.Error())
return 1
}
// First argument needs to be the executable name.
// TODO(banks): passing config including an ACL token on command line is jank
// - this is world readable. It's easiest thing for now. Temp files are kinda
// gross in a different way - we can limit to same-user access which is much
// better but we are leaving the ACL secret on disk unencrypted for an
// uncontrolled amount of time and in a location the operator doesn't even
// know about. Envoy doesn't support reading bootstrap from stdin or ENV
envoyArgs := []string{binary, "--config-yaml", bootstrapYaml}
envoyArgs = append(envoyArgs, passThroughArgs...)
// Exec
err = syscall.Exec(binary, envoyArgs, os.Environ())
if err != nil {
c.UI.Error("Failed to exec envoy: " + err.Error())
return 1
}
return 0
}
func (c *cmd) findBinary() (string, error) {
if c.envoyBin != "" {
return c.envoyBin, nil
}
return exec.LookPath("envoy")
}
// TODO(banks) this method ended up with a few subtleties that should be unit
// tested.
func (c *cmd) generateConfig() (string, error) {
var t = template.Must(template.New("bootstrap").Parse(bootstrapTemplate))
httpCfg := api.DefaultConfig()
c.http.MergeOntoConfig(httpCfg)
// Decide on TLS if the scheme is provided and indicates it, if the HTTP env
// suggests TLS is supported explicitly (CONSUL_HTTP_SSL) or implicitly
// (CONSUL_HTTP_ADDR) is https://
useTLS := false
if strings.HasPrefix(strings.ToLower(c.grpcAddr), "https://") {
useTLS = true
} else if useSSLEnv := os.Getenv(api.HTTPSSLEnvName); useSSLEnv != "" {
if enabled, err := strconv.ParseBool(useSSLEnv); err != nil {
useTLS = enabled
}
} else if strings.HasPrefix(strings.ToLower(httpCfg.Address), "https://") {
useTLS = true
}
// We want to allow grpcAddr set as host:port with no scheme but if the host
// is an IP this will fail to parse as a URL with "parse 127.0.0.1:8500: first
// path segment in URL cannot contain colon". On the other hand we also
// support both http(s)://host:port and unix:///path/to/file.
addrPort := strings.TrimPrefix(c.grpcAddr, "http://")
addrPort = strings.TrimPrefix(c.grpcAddr, "https://")
agentAddr, agentPort, err := net.SplitHostPort(addrPort)
if err != nil {
return "", fmt.Errorf("Invalid Consul HTTP address: %s", err)
}
if agentAddr == "" {
agentAddr = "127.0.0.1"
}
// We use STATIC for agent which means we need to resolve DNS names like
// `localhost` ourselves. We could use STRICT_DNS or DYNAMIC_DNS with envoy
// but Envoy resolves `localhost` differently to go on macOS at least which
// causes paper cuts like default dev agent (which binds specifically to
// 127.0.0.1) isn't reachable since Envoy resolves localhost to `[::]` and
// can't connect.
agentIP, err := net.ResolveIPAddr("ip", agentAddr)
if err != nil {
return "", fmt.Errorf("Failed to resolve agent address: %s", err)
}
adminAddr, adminPort, err := net.SplitHostPort(c.adminBind)
if err != nil {
return "", fmt.Errorf("Invalid Consul HTTP address: %s", err)
}
// Envoy requires IP addresses to bind too when using static so resolve DNS or
// localhost here.
adminBindIP, err := net.ResolveIPAddr("ip", adminAddr)
if err != nil {
return "", fmt.Errorf("Failed to resolve admin bind address: %s", err)
}
args := templateArgs{
ProxyCluster: c.proxyID,
ProxyID: c.proxyID,
AgentHTTPAddress: agentIP.String(),
AgentHTTPPort: agentPort,
AgentTLS: useTLS,
AgentCAFile: httpCfg.TLSConfig.CAFile,
AdminBindAddress: adminBindIP.String(),
AdminBindPort: adminPort,
Token: httpCfg.Token,
LocalAgentClusterName: xds.LocalAgentClusterName,
}
var buf bytes.Buffer
err = t.Execute(&buf, args)
if err != nil {
return "", err
}
return buf.String(), nil
}
func (c *cmd) lookupProxyIDForSidecar() (string, error) {
return proxyCmd.LookupProxyIDForSidecar(c.client, c.sidecarFor)
}
func (c *cmd) Synopsis() string {
return synopsis
}
func (c *cmd) Help() string {
return c.help
}
const synopsis = "Runs or Configures Envoy as a Connect proxy"
const help = `
Usage: consul connect envoy [options]
Generates the bootstrap configuration needed to start an Envoy proxy instance
for use as a Connect sidecar for a particular service instance. By default it
will generate the config and then exec Envoy directly until it exits normally.
It will search $PATH for the envoy binary but this can be overridden with
-envoy-binary.
It can instead only generate the bootstrap.yaml based on the current ENV and
arguments using -bootstrap.
The proxy requires service:write permissions for the service it represents.
The token may be passed via the CLI or the CONSUL_TOKEN environment
variable.
The example below shows how to start a local proxy as a sidecar to a "web"
service instance. It assumes that the proxy was already registered with it's
Config for example via a sidecar_service block.
$ consul connect envoy -sidecar-for web
`

View File

@ -0,0 +1,13 @@
package envoy
import (
"strings"
"testing"
)
func TestCatalogCommand_noTabs(t *testing.T) {
t.Parallel()
if strings.ContainsRune(New(nil).Help(), '\t') {
t.Fatal("help has tabs")
}
}

View File

@ -212,27 +212,37 @@ func (c *cmd) Run(args []string) int {
}
func (c *cmd) lookupProxyIDForSidecar(client *api.Client) (string, error) {
return LookupProxyIDForSidecar(client, c.sidecarFor)
}
// LookupProxyIDForSidecar finds candidate local proxy registrations that are a
// sidcar for the given service. It will return an ID if and only if there is
// exactly one registed connect proxy with `Proxy.DestinationServiceID` set to
// the specified service ID.
//
// This is exported to share it with the connect envoy command.
func LookupProxyIDForSidecar(client *api.Client, sidecarFor string) (string, error) {
svcs, err := client.Agent().Services()
if err != nil {
return "", fmt.Errorf("Failed looking up sidecar proxy info for %s: %s",
c.sidecarFor, err)
sidecarFor, err)
}
var proxyIDs []string
for _, svc := range svcs {
if svc.Kind == api.ServiceKindConnectProxy && svc.Proxy != nil &&
strings.ToLower(svc.Proxy.DestinationServiceID) == c.sidecarFor {
strings.ToLower(svc.Proxy.DestinationServiceID) == sidecarFor {
proxyIDs = append(proxyIDs, svc.ID)
}
}
if len(proxyIDs) == 0 {
return "", fmt.Errorf("No sidecar proxy registereded for %s", c.sidecarFor)
return "", fmt.Errorf("No sidecar proxy registereded for %s", sidecarFor)
}
if len(proxyIDs) > 1 {
return "", fmt.Errorf("More than one sidecar proxy registereded for %s.\n"+
" Start proxy with -proxy-id and one of the following IDs: %s",
c.sidecarFor, strings.Join(proxyIDs, ", "))
sidecarFor, strings.Join(proxyIDs, ", "))
}
return proxyIDs[0], nil
}

View File

@ -1,6 +1,7 @@
package proxy
import (
"strings"
"testing"
"time"
@ -184,3 +185,10 @@ func testConfig(t *testing.T, cw proxy.ConfigWatcher) *proxy.Config {
return nil // satisfy compiler
}
}
func TestCatalogCommand_noTabs(t *testing.T) {
t.Parallel()
if strings.ContainsRune(New(nil, nil).Help(), '\t') {
t.Fatal("help has tabs")
}
}

105
logger/grpc.go Normal file
View File

@ -0,0 +1,105 @@
package logger
import (
"fmt"
"log"
)
// GRPCLogger wrapps a *log.Logger and implements the grpclog.LoggerV2 interface
// allowing gRPC servers to log to the standard Consul logger.
type GRPCLogger struct {
level string
l *log.Logger
}
// NewGRPCLogger creates a grpclog.LoggerV2 that will output to the supplied
// logger with Severity/Verbosity level appropriate for the given config.
//
// Note that grpclog has Info, Warning, Error, Fatal severity levels AND integer
// verbosity levels for additional info. Verbose logs in glog are always INFO
// severity so we map Info,V0 to INFO, Info,V1 to DEBUG, and Info,V>1 to TRACE.
func NewGRPCLogger(config *Config, logger *log.Logger) *GRPCLogger {
return &GRPCLogger{
level: config.LogLevel,
l: logger,
}
}
// Info implements grpclog.LoggerV2
func (g *GRPCLogger) Info(args ...interface{}) {
args = append([]interface{}{"[INFO] "}, args...)
g.l.Print(args...)
}
// Infoln implements grpclog.LoggerV2
func (g *GRPCLogger) Infoln(args ...interface{}) {
g.Info(fmt.Sprintln(args...))
}
// Infof implements grpclog.LoggerV2
func (g *GRPCLogger) Infof(format string, args ...interface{}) {
g.Info(fmt.Sprintf(format, args...))
}
// Warning implements grpclog.LoggerV2
func (g *GRPCLogger) Warning(args ...interface{}) {
args = append([]interface{}{"[WARN] "}, args...)
g.l.Print(args...)
}
// Warningln implements grpclog.LoggerV2
func (g *GRPCLogger) Warningln(args ...interface{}) {
g.Warning(fmt.Sprintln(args...))
}
// Warningf implements grpclog.LoggerV2
func (g *GRPCLogger) Warningf(format string, args ...interface{}) {
g.Warning(fmt.Sprintf(format, args...))
}
// Error implements grpclog.LoggerV2
func (g *GRPCLogger) Error(args ...interface{}) {
args = append([]interface{}{"[ERR] "}, args...)
g.l.Print(args...)
}
// Errorln implements grpclog.LoggerV2
func (g *GRPCLogger) Errorln(args ...interface{}) {
g.Error(fmt.Sprintln(args...))
}
// Errorf implements grpclog.LoggerV2
func (g *GRPCLogger) Errorf(format string, args ...interface{}) {
g.Error(fmt.Sprintf(format, args...))
}
// Fatal implements grpclog.LoggerV2
func (g *GRPCLogger) Fatal(args ...interface{}) {
args = append([]interface{}{"[ERR] "}, args...)
g.l.Fatal(args...)
}
// Fatalln implements grpclog.LoggerV2
func (g *GRPCLogger) Fatalln(args ...interface{}) {
g.Fatal(fmt.Sprintln(args...))
}
// Fatalf implements grpclog.LoggerV2
func (g *GRPCLogger) Fatalf(format string, args ...interface{}) {
g.Fatal(fmt.Sprintf(format, args...))
}
// V implements grpclog.LoggerV2
func (g *GRPCLogger) V(l int) bool {
switch g.level {
case "TRACE":
// Enable ALL the verbosity!
return true
case "DEBUG":
return l < 2
case "INFO":
return l < 1
default:
return false
}
}

94
logger/grpc_test.go Normal file
View File

@ -0,0 +1,94 @@
package logger
import (
"bytes"
"fmt"
"log"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/grpclog"
)
func TestGRPCLogger(t *testing.T) {
var out bytes.Buffer
// No flags so we don't have to include date/time in expected output
logger := log.New(&out, "", 0)
grpclog.SetLoggerV2(NewGRPCLogger(&Config{LogLevel: "TRACE"}, logger))
// All of these should output something
grpclog.Info("Info,")
grpclog.Infoln("Infoln")
grpclog.Infof("Infof: %d\n", 1)
grpclog.Warning("Warning,")
grpclog.Warningln("Warningln")
grpclog.Warningf("Warningf: %d\n", 1)
grpclog.Error("Error,")
grpclog.Errorln("Errorln")
grpclog.Errorf("Errorf: %d\n", 1)
// Fatal tests are hard... assume they are good!
expect := `[INFO] Info,
[INFO] Infoln
[INFO] Infof: 1
[WARN] Warning,
[WARN] Warningln
[WARN] Warningf: 1
[ERR] Error,
[ERR] Errorln
[ERR] Errorf: 1
`
require.Equal(t, expect, out.String())
}
func TestGRPCLogger_V(t *testing.T) {
tests := []struct {
level string
v int
want bool
}{
{"ERR", -1, false},
{"ERR", 0, false},
{"ERR", 1, false},
{"ERR", 2, false},
{"ERR", 3, false},
{"WARN", -1, false},
{"WARN", 0, false},
{"WARN", 1, false},
{"WARN", 2, false},
{"WARN", 3, false},
{"INFO", -1, true},
{"INFO", 0, true},
{"INFO", 1, false},
{"INFO", 2, false},
{"INFO", 3, false},
{"DEBUG", -1, true},
{"DEBUG", 0, true},
{"DEBUG", 1, true},
{"DEBUG", 2, false},
{"DEBUG", 3, false},
{"TRACE", -1, true},
{"TRACE", 0, true},
{"TRACE", 1, true},
{"TRACE", 2, true},
{"TRACE", 3, true},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%s,%d", tt.level, tt.v), func(t *testing.T) {
var out bytes.Buffer
// No flags so we don't have to include date/time in expected output
logger := log.New(&out, "", 0)
grpclog.SetLoggerV2(NewGRPCLogger(&Config{LogLevel: tt.level}, logger))
assert.Equal(t, tt.want, grpclog.V(tt.v))
})
}
}