mirror of https://github.com/hashicorp/consul
Check ACLs more often for xDS endpoints.
For established xDS gRPC streams recheck ACLs for each DiscoveryRequest or DiscoveryResponse. If more than 5 minutes has elapsed since the last ACL check, recheck even without an incoming DiscoveryRequest or DiscoveryResponse. ACL failures will terminate the stream.pull/5237/head
parent
2dea3e2bd7
commit
d3eb781384
|
@ -522,6 +522,8 @@ func (a *Agent) listenAndServeGRPC() error {
|
|||
Authz: a,
|
||||
ResolveToken: a.resolveToken,
|
||||
}
|
||||
a.xdsServer.Initialize()
|
||||
|
||||
var err error
|
||||
a.grpcServer, err = a.xdsServer.GRPCServer(a.config.CertFile, a.config.KeyFile)
|
||||
if err != nil {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
|
@ -56,6 +57,10 @@ const (
|
|||
// LocalAgentClusterName is the name we give the local agent "cluster" in
|
||||
// Envoy config.
|
||||
LocalAgentClusterName = "local_agent"
|
||||
|
||||
// DefaultAuthCheckFrequency is the default value for
|
||||
// Server.AuthCheckFrequency to use when the zero value is provided.
|
||||
DefaultAuthCheckFrequency = 5 * time.Minute
|
||||
)
|
||||
|
||||
// ACLResolverFunc is a shim to resolve ACLs. Since ACL enforcement is so far
|
||||
|
@ -90,6 +95,18 @@ type Server struct {
|
|||
CfgMgr ConfigManager
|
||||
Authz ConnectAuthz
|
||||
ResolveToken ACLResolverFunc
|
||||
// AuthCheckFrequency is how often we should re-check the credentials used
|
||||
// during a long-lived gRPC Stream after it has been initially established.
|
||||
// This is only used during idle periods of stream interactions (i.e. when
|
||||
// there has been no recent DiscoveryRequest).
|
||||
AuthCheckFrequency time.Duration
|
||||
}
|
||||
|
||||
// Initialize will finish configuring the Server for first use.
|
||||
func (s *Server) Initialize() {
|
||||
if s.AuthCheckFrequency == 0 {
|
||||
s.AuthCheckFrequency = DefaultAuthCheckFrequency
|
||||
}
|
||||
}
|
||||
|
||||
// StreamAggregatedResources implements
|
||||
|
@ -126,7 +143,7 @@ func (s *Server) StreamAggregatedResources(stream ADSStream) error {
|
|||
|
||||
const (
|
||||
stateInit int = iota
|
||||
statePendingAuth
|
||||
statePendingInitialConfig
|
||||
stateRunning
|
||||
)
|
||||
|
||||
|
@ -176,8 +193,44 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
|
|||
},
|
||||
}
|
||||
|
||||
var authTimer <-chan time.Time
|
||||
extendAuthTimer := func() {
|
||||
authTimer = time.After(s.AuthCheckFrequency)
|
||||
}
|
||||
|
||||
checkStreamACLs := func(cfgSnap *proxycfg.ConfigSnapshot) error {
|
||||
if cfgSnap == nil {
|
||||
return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot")
|
||||
}
|
||||
|
||||
token := tokenFromStream(stream)
|
||||
rule, err := s.ResolveToken(token)
|
||||
|
||||
if acl.IsErrNotFound(err) {
|
||||
return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err)
|
||||
} else if acl.IsErrPermissionDenied(err) {
|
||||
return status.Errorf(codes.PermissionDenied, "permission denied: %v", err)
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) {
|
||||
return status.Errorf(codes.PermissionDenied, "permission denied")
|
||||
}
|
||||
|
||||
// Authed OK!
|
||||
return nil
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-authTimer:
|
||||
// It's been too long since a Discovery{Request,Response} so recheck ACLs.
|
||||
if err := checkStreamACLs(cfgSnap); err != nil {
|
||||
return err
|
||||
}
|
||||
extendAuthTimer()
|
||||
|
||||
case req, ok = <-reqCh:
|
||||
if !ok {
|
||||
// reqCh is closed when stream.Recv errors which is how we detect client
|
||||
|
@ -218,27 +271,27 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
|
|||
defer watchCancel()
|
||||
|
||||
// Now wait for the config so we can check ACL
|
||||
state = statePendingAuth
|
||||
case statePendingAuth:
|
||||
state = statePendingInitialConfig
|
||||
case statePendingInitialConfig:
|
||||
if cfgSnap == nil {
|
||||
// Nothing we can do until we get the initial config
|
||||
continue
|
||||
}
|
||||
// Got config, try to authenticate
|
||||
token := tokenFromStream(stream)
|
||||
rule, err := s.ResolveToken(token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) {
|
||||
return status.Errorf(codes.PermissionDenied, "permission denied")
|
||||
}
|
||||
// Authed OK!
|
||||
|
||||
// Got config, try to authenticate next.
|
||||
state = stateRunning
|
||||
|
||||
// Lets actually process the config we just got or we'll mis responding
|
||||
fallthrough
|
||||
case stateRunning:
|
||||
// Check ACLs on every Discovery{Request,Response}.
|
||||
if err := checkStreamACLs(cfgSnap); err != nil {
|
||||
return err
|
||||
}
|
||||
// For the first time through the state machine, this is when the
|
||||
// timer is first started.
|
||||
extendAuthTimer()
|
||||
|
||||
// See if any handlers need to have the current (possibly new) config
|
||||
// sent. Note the order here is actually significant so we can't just
|
||||
// range the map which has no determined order. It's important because:
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"text/template"
|
||||
"time"
|
||||
|
@ -115,7 +116,13 @@ func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) {
|
|||
envoy := NewTestEnvoy(t, "web-sidecar-proxy", "")
|
||||
defer envoy.Close()
|
||||
|
||||
s := Server{logger, mgr, mgr, aclResolve}
|
||||
s := Server{
|
||||
Logger: logger,
|
||||
CfgMgr: mgr,
|
||||
Authz: mgr,
|
||||
ResolveToken: aclResolve,
|
||||
}
|
||||
s.Initialize()
|
||||
|
||||
go func() {
|
||||
err := s.StreamAggregatedResources(envoy.stream)
|
||||
|
@ -589,7 +596,13 @@ func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) {
|
|||
envoy := NewTestEnvoy(t, "web-sidecar-proxy", tt.token)
|
||||
defer envoy.Close()
|
||||
|
||||
s := Server{logger, mgr, mgr, aclResolve}
|
||||
s := Server{
|
||||
Logger: logger,
|
||||
CfgMgr: mgr,
|
||||
Authz: mgr,
|
||||
ResolveToken: aclResolve,
|
||||
}
|
||||
s.Initialize()
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
|
@ -632,6 +645,196 @@ func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedDuringDiscoveryRequest(t *testing.T) {
|
||||
aclRules := `service "web" { policy = "write" }`
|
||||
token := "service-write-on-web"
|
||||
|
||||
policy, err := acl.NewPolicyFromSource("", 0, aclRules, acl.SyntaxLegacy, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
var validToken atomic.Value
|
||||
validToken.Store(token)
|
||||
|
||||
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||
mgr := newTestManager(t)
|
||||
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||
if token := validToken.Load(); token == nil || id != token.(string) {
|
||||
return nil, acl.ErrNotFound
|
||||
}
|
||||
|
||||
return acl.NewPolicyAuthorizer(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
|
||||
}
|
||||
envoy := NewTestEnvoy(t, "web-sidecar-proxy", token)
|
||||
defer envoy.Close()
|
||||
|
||||
s := Server{
|
||||
Logger: logger,
|
||||
CfgMgr: mgr,
|
||||
Authz: mgr,
|
||||
ResolveToken: aclResolve,
|
||||
AuthCheckFrequency: 1 * time.Hour, // make sure this doesn't kick in
|
||||
}
|
||||
s.Initialize()
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- s.StreamAggregatedResources(envoy.stream)
|
||||
}()
|
||||
|
||||
getError := func() (gotErr error, ok bool) {
|
||||
select {
|
||||
case err := <-errCh:
|
||||
return err, true
|
||||
default:
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
|
||||
// Register the proxy to create state needed to Watch() on
|
||||
mgr.RegisterProxy(t, "web-sidecar-proxy")
|
||||
|
||||
// Send initial cluster discover (OK)
|
||||
envoy.SendReq(t, ClusterType, 0, 0)
|
||||
{
|
||||
err, ok := getError()
|
||||
require.NoError(t, err)
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
||||
// Check no response sent yet
|
||||
assertChanBlocked(t, envoy.stream.sendCh)
|
||||
{
|
||||
err, ok := getError()
|
||||
require.NoError(t, err)
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
||||
// Deliver a new snapshot
|
||||
snap := proxycfg.TestConfigSnapshot(t)
|
||||
mgr.DeliverConfig(t, "web-sidecar-proxy", snap)
|
||||
|
||||
assertResponseSent(t, envoy.stream.sendCh, expectClustersJSON(t, snap, token, 1, 1))
|
||||
|
||||
// Now nuke the ACL token.
|
||||
validToken.Store("")
|
||||
|
||||
// It also (in parallel) issues the next cluster request (which acts as an ACK
|
||||
// of the version we sent)
|
||||
envoy.SendReq(t, ClusterType, 1, 1)
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
require.Error(t, err)
|
||||
gerr, ok := status.FromError(err)
|
||||
require.Truef(t, ok, "not a grpc status error: type='%T' value=%v", err, err)
|
||||
require.Equal(t, codes.Unauthenticated, gerr.Code())
|
||||
require.Equal(t, "unauthenticated: ACL not found", gerr.Message())
|
||||
|
||||
mgr.AssertWatchCancelled(t, "web-sidecar-proxy")
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Fatalf("timed out waiting for handler to finish")
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedInBackground(t *testing.T) {
|
||||
aclRules := `service "web" { policy = "write" }`
|
||||
token := "service-write-on-web"
|
||||
|
||||
policy, err := acl.NewPolicyFromSource("", 0, aclRules, acl.SyntaxLegacy, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
var validToken atomic.Value
|
||||
validToken.Store(token)
|
||||
|
||||
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||
mgr := newTestManager(t)
|
||||
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||
if token := validToken.Load(); token == nil || id != token.(string) {
|
||||
return nil, acl.ErrNotFound
|
||||
}
|
||||
|
||||
return acl.NewPolicyAuthorizer(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
|
||||
}
|
||||
envoy := NewTestEnvoy(t, "web-sidecar-proxy", token)
|
||||
defer envoy.Close()
|
||||
|
||||
s := Server{
|
||||
Logger: logger,
|
||||
CfgMgr: mgr,
|
||||
Authz: mgr,
|
||||
ResolveToken: aclResolve,
|
||||
AuthCheckFrequency: 100 * time.Millisecond, // Make this short.
|
||||
}
|
||||
s.Initialize()
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- s.StreamAggregatedResources(envoy.stream)
|
||||
}()
|
||||
|
||||
getError := func() (gotErr error, ok bool) {
|
||||
select {
|
||||
case err := <-errCh:
|
||||
return err, true
|
||||
default:
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
|
||||
// Register the proxy to create state needed to Watch() on
|
||||
mgr.RegisterProxy(t, "web-sidecar-proxy")
|
||||
|
||||
// Send initial cluster discover (OK)
|
||||
envoy.SendReq(t, ClusterType, 0, 0)
|
||||
{
|
||||
err, ok := getError()
|
||||
require.NoError(t, err)
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
||||
// Check no response sent yet
|
||||
assertChanBlocked(t, envoy.stream.sendCh)
|
||||
{
|
||||
err, ok := getError()
|
||||
require.NoError(t, err)
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
||||
// Deliver a new snapshot
|
||||
snap := proxycfg.TestConfigSnapshot(t)
|
||||
mgr.DeliverConfig(t, "web-sidecar-proxy", snap)
|
||||
|
||||
assertResponseSent(t, envoy.stream.sendCh, expectClustersJSON(t, snap, token, 1, 1))
|
||||
|
||||
// It also (in parallel) issues the next cluster request (which acts as an ACK
|
||||
// of the version we sent)
|
||||
envoy.SendReq(t, ClusterType, 1, 1)
|
||||
|
||||
// Check no response sent yet
|
||||
assertChanBlocked(t, envoy.stream.sendCh)
|
||||
{
|
||||
err, ok := getError()
|
||||
require.NoError(t, err)
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
||||
// Now nuke the ACL token while there's no activity.
|
||||
validToken.Store("")
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
require.Error(t, err)
|
||||
gerr, ok := status.FromError(err)
|
||||
require.Truef(t, ok, "not a grpc status error: type='%T' value=%v", err, err)
|
||||
require.Equal(t, codes.Unauthenticated, gerr.Code())
|
||||
require.Equal(t, "unauthenticated: ACL not found", gerr.Message())
|
||||
|
||||
mgr.AssertWatchCancelled(t, "web-sidecar-proxy")
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Fatalf("timed out waiting for handler to finish")
|
||||
}
|
||||
}
|
||||
|
||||
// This tests the ext_authz service method that implements connect authz.
|
||||
func TestServer_Check(t *testing.T) {
|
||||
|
||||
|
@ -729,7 +932,13 @@ func TestServer_Check(t *testing.T) {
|
|||
envoy := NewTestEnvoy(t, "web-sidecar-proxy", token)
|
||||
defer envoy.Close()
|
||||
|
||||
s := Server{logger, mgr, mgr, aclResolve}
|
||||
s := Server{
|
||||
Logger: logger,
|
||||
CfgMgr: mgr,
|
||||
Authz: mgr,
|
||||
ResolveToken: aclResolve,
|
||||
}
|
||||
s.Initialize()
|
||||
|
||||
// Create a context with the correct token
|
||||
ctx := metadata.NewIncomingContext(context.Background(),
|
||||
|
|
Loading…
Reference in New Issue