mirror of https://github.com/hashicorp/consul
commit
95930e3cb7
35
acl/cache.go
35
acl/cache.go
|
@ -8,7 +8,7 @@ import (
|
|||
)
|
||||
|
||||
// FaultFunc is a function used to fault in the parent,
|
||||
// rules for an ACL given it's ID
|
||||
// rules for an ACL given its ID
|
||||
type FaultFunc func(id string) (string, string, error)
|
||||
|
||||
// aclEntry allows us to store the ACL with it's policy ID
|
||||
|
@ -21,9 +21,9 @@ type aclEntry struct {
|
|||
// Cache is used to implement policy and ACL caching
|
||||
type Cache struct {
|
||||
faultfn FaultFunc
|
||||
aclCache *lru.Cache // Cache id -> acl
|
||||
policyCache *lru.Cache // Cache policy -> acl
|
||||
ruleCache *lru.Cache // Cache rules -> policy
|
||||
aclCache *lru.TwoQueueCache // Cache id -> acl
|
||||
policyCache *lru.TwoQueueCache // Cache policy -> acl
|
||||
ruleCache *lru.TwoQueueCache // Cache rules -> policy
|
||||
}
|
||||
|
||||
// NewCache constructs a new policy and ACL cache of a given size
|
||||
|
@ -31,9 +31,22 @@ func NewCache(size int, faultfn FaultFunc) (*Cache, error) {
|
|||
if size <= 0 {
|
||||
return nil, fmt.Errorf("Must provide positive cache size")
|
||||
}
|
||||
rc, _ := lru.New(size)
|
||||
pc, _ := lru.New(size)
|
||||
ac, _ := lru.New(size)
|
||||
|
||||
rc, err := lru.New2Q(size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pc, err := lru.New2Q(size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ac, err := lru.New2Q(size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := &Cache{
|
||||
faultfn: faultfn,
|
||||
aclCache: ac,
|
||||
|
@ -46,7 +59,7 @@ func NewCache(size int, faultfn FaultFunc) (*Cache, error) {
|
|||
// GetPolicy is used to get a potentially cached policy set.
|
||||
// If not cached, it will be parsed, and then cached.
|
||||
func (c *Cache) GetPolicy(rules string) (*Policy, error) {
|
||||
return c.getPolicy(c.ruleID(rules), rules)
|
||||
return c.getPolicy(RuleID(rules), rules)
|
||||
}
|
||||
|
||||
// getPolicy is an internal method to get a cached policy,
|
||||
|
@ -66,8 +79,8 @@ func (c *Cache) getPolicy(id, rules string) (*Policy, error) {
|
|||
|
||||
}
|
||||
|
||||
// ruleID is used to generate an ID for a rule
|
||||
func (c *Cache) ruleID(rules string) string {
|
||||
// RuleID is used to generate an ID for a rule
|
||||
func RuleID(rules string) string {
|
||||
return fmt.Sprintf("%x", md5.Sum([]byte(rules)))
|
||||
}
|
||||
|
||||
|
@ -112,7 +125,7 @@ func (c *Cache) GetACL(id string) (ACL, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ruleID := c.ruleID(rules)
|
||||
ruleID := RuleID(rules)
|
||||
|
||||
// Check for a compiled ACL
|
||||
policyID := c.policyID(parentID, ruleID)
|
||||
|
|
|
@ -5,7 +5,7 @@ import (
|
|||
)
|
||||
|
||||
func TestCache_GetPolicy(t *testing.T) {
|
||||
c, err := NewCache(1, nil)
|
||||
c, err := NewCache(2, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -24,11 +24,23 @@ func TestCache_GetPolicy(t *testing.T) {
|
|||
t.Fatalf("should be cached")
|
||||
}
|
||||
|
||||
// Cache a new policy
|
||||
// Work with some new policies to evict the original one
|
||||
_, err = c.GetPolicy(testSimplePolicy)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
_, err = c.GetPolicy(testSimplePolicy)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
_, err = c.GetPolicy(testSimplePolicy2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
_, err = c.GetPolicy(testSimplePolicy2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Test invalidation of p
|
||||
p3, err := c.GetPolicy("")
|
||||
|
@ -44,12 +56,13 @@ func TestCache_GetACL(t *testing.T) {
|
|||
policies := map[string]string{
|
||||
"foo": testSimplePolicy,
|
||||
"bar": testSimplePolicy2,
|
||||
"baz": testSimplePolicy3,
|
||||
}
|
||||
faultfn := func(id string) (string, string, error) {
|
||||
return "deny", policies[id], nil
|
||||
}
|
||||
|
||||
c, err := NewCache(1, faultfn)
|
||||
c, err := NewCache(2, faultfn)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -80,6 +93,18 @@ func TestCache_GetACL(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
_, err = c.GetACL("bar")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
_, err = c.GetACL("baz")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
_, err = c.GetACL("baz")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
acl3, err := c.GetACL("foo")
|
||||
if err != nil {
|
||||
|
@ -100,7 +125,7 @@ func TestCache_ClearACL(t *testing.T) {
|
|||
return "deny", policies[id], nil
|
||||
}
|
||||
|
||||
c, err := NewCache(1, faultfn)
|
||||
c, err := NewCache(16, faultfn)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -135,7 +160,7 @@ func TestCache_Purge(t *testing.T) {
|
|||
return "deny", policies[id], nil
|
||||
}
|
||||
|
||||
c, err := NewCache(1, faultfn)
|
||||
c, err := NewCache(16, faultfn)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -167,7 +192,7 @@ func TestCache_GetACLPolicy(t *testing.T) {
|
|||
faultfn := func(id string) (string, string, error) {
|
||||
return "deny", policies[id], nil
|
||||
}
|
||||
c, err := NewCache(1, faultfn)
|
||||
c, err := NewCache(16, faultfn)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -220,7 +245,7 @@ func TestCache_GetACL_Parent(t *testing.T) {
|
|||
return "", "", nil
|
||||
}
|
||||
|
||||
c, err := NewCache(1, faultfn)
|
||||
c, err := NewCache(16, faultfn)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -296,3 +321,8 @@ key "bar/" {
|
|||
policy = "read"
|
||||
}
|
||||
`
|
||||
var testSimplePolicy3 = `
|
||||
key "baz/" {
|
||||
policy = "read"
|
||||
}
|
||||
`
|
||||
|
|
|
@ -205,3 +205,20 @@ func (s *HTTPServer) ACLList(resp http.ResponseWriter, req *http.Request) (inter
|
|||
}
|
||||
return out.ACLs, nil
|
||||
}
|
||||
|
||||
func (s *HTTPServer) ACLReplicationStatus(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
// Note that we do not forward to the ACL DC here. This is a query for
|
||||
// any DC that's doing replication.
|
||||
args := structs.DCSpecificRequest{}
|
||||
s.parseSource(req, &args.Source)
|
||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Make the request.
|
||||
var out structs.ACLReplicationStatus
|
||||
if err := s.agent.RPC("ACL.ReplicationStatus", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
|
|
@ -218,3 +218,18 @@ func TestACLList(t *testing.T) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestACLReplicationStatus(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
req, err := http.NewRequest("GET", "/v1/acl/replication", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.ACLReplicationStatus(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
_, ok := obj.(structs.ACLReplicationStatus)
|
||||
if !ok {
|
||||
t.Fatalf("should work")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -339,6 +339,9 @@ func (a *Agent) consulConfig() *consul.Config {
|
|||
if a.config.ACLDownPolicy != "" {
|
||||
base.ACLDownPolicy = a.config.ACLDownPolicy
|
||||
}
|
||||
if a.config.ACLReplicationToken != "" {
|
||||
base.ACLReplicationToken = a.config.ACLReplicationToken
|
||||
}
|
||||
if a.config.SessionTTLMinRaw != "" {
|
||||
base.SessionTTLMin = a.config.SessionTTLMin
|
||||
}
|
||||
|
|
|
@ -452,6 +452,12 @@ type Config struct {
|
|||
// this acts like deny.
|
||||
ACLDownPolicy string `mapstructure:"acl_down_policy"`
|
||||
|
||||
// ACLReplicationToken is used to fetch ACLs from the ACLDatacenter in
|
||||
// order to replicate them locally. Setting this to a non-empty value
|
||||
// also enables replication. Replication is only available in datacenters
|
||||
// other than the ACLDatacenter.
|
||||
ACLReplicationToken string `mapstructure:"acl_replication_token" json:"-"`
|
||||
|
||||
// Watches are used to monitor various endpoints and to invoke a
|
||||
// handler to act appropriately. These are managed entirely in the
|
||||
// agent layer using the standard APIs.
|
||||
|
@ -1319,6 +1325,9 @@ func MergeConfig(a, b *Config) *Config {
|
|||
if b.ACLDefaultPolicy != "" {
|
||||
result.ACLDefaultPolicy = b.ACLDefaultPolicy
|
||||
}
|
||||
if b.ACLReplicationToken != "" {
|
||||
result.ACLReplicationToken = b.ACLReplicationToken
|
||||
}
|
||||
if len(b.Watches) != 0 {
|
||||
result.Watches = append(result.Watches, b.Watches...)
|
||||
}
|
||||
|
|
|
@ -622,7 +622,8 @@ func TestDecodeConfig(t *testing.T) {
|
|||
// ACLs
|
||||
input = `{"acl_token": "1234", "acl_datacenter": "dc2",
|
||||
"acl_ttl": "60s", "acl_down_policy": "deny",
|
||||
"acl_default_policy": "deny", "acl_master_token": "2345"}`
|
||||
"acl_default_policy": "deny", "acl_master_token": "2345",
|
||||
"acl_replication_token": "8675309"}`
|
||||
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
|
@ -646,6 +647,9 @@ func TestDecodeConfig(t *testing.T) {
|
|||
if config.ACLDefaultPolicy != "deny" {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
if config.ACLReplicationToken != "8675309" {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
|
||||
// Watches
|
||||
input = `{"watches": [{"type":"keyprefix", "prefix":"foo/", "handler":"foobar"}]}`
|
||||
|
@ -1432,6 +1436,7 @@ func TestMergeConfig(t *testing.T) {
|
|||
ACLTTLRaw: "15s",
|
||||
ACLDownPolicy: "deny",
|
||||
ACLDefaultPolicy: "deny",
|
||||
ACLReplicationToken: "8765309",
|
||||
Watches: []map[string]interface{}{
|
||||
map[string]interface{}{
|
||||
"type": "keyprefix",
|
||||
|
|
|
@ -257,6 +257,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
|
|||
s.mux.HandleFunc("/v1/acl/info/", s.wrap(s.ACLGet))
|
||||
s.mux.HandleFunc("/v1/acl/clone/", s.wrap(s.ACLClone))
|
||||
s.mux.HandleFunc("/v1/acl/list", s.wrap(s.ACLList))
|
||||
s.mux.HandleFunc("/v1/acl/replication", s.wrap(s.ACLReplicationStatus))
|
||||
} else {
|
||||
s.mux.HandleFunc("/v1/acl/create", s.wrap(aclDisabled))
|
||||
s.mux.HandleFunc("/v1/acl/update", s.wrap(aclDisabled))
|
||||
|
@ -264,6 +265,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
|
|||
s.mux.HandleFunc("/v1/acl/info/", s.wrap(aclDisabled))
|
||||
s.mux.HandleFunc("/v1/acl/clone/", s.wrap(aclDisabled))
|
||||
s.mux.HandleFunc("/v1/acl/list", s.wrap(aclDisabled))
|
||||
s.mux.HandleFunc("/v1/acl/replication", s.wrap(aclDisabled))
|
||||
}
|
||||
|
||||
s.mux.HandleFunc("/v1/query", s.wrap(s.PreparedQueryGeneral))
|
||||
|
|
133
consul/acl.go
133
consul/acl.go
|
@ -37,14 +37,14 @@ const (
|
|||
redactedToken = "<hidden>"
|
||||
|
||||
// Maximum number of cached ACL entries
|
||||
aclCacheSize = 256
|
||||
aclCacheSize = 10 * 1024
|
||||
)
|
||||
|
||||
var (
|
||||
permissionDeniedErr = errors.New(permissionDenied)
|
||||
)
|
||||
|
||||
// aclCacheEntry is used to cache non-authoritative ACL's
|
||||
// aclCacheEntry is used to cache non-authoritative ACLs
|
||||
// If non-authoritative, then we must respect a TTL
|
||||
type aclCacheEntry struct {
|
||||
ACL acl.ACL
|
||||
|
@ -52,9 +52,14 @@ type aclCacheEntry struct {
|
|||
ETag string
|
||||
}
|
||||
|
||||
// aclFault is used to fault in the rules for an ACL if we take a miss
|
||||
func (s *Server) aclFault(id string) (string, string, error) {
|
||||
// aclLocalFault is used by the authoritative ACL cache to fault in the rules
|
||||
// for an ACL if we take a miss. This goes directly to the state store, so it
|
||||
// assumes its running in the ACL datacenter, or in a non-ACL datacenter when
|
||||
// using its replicated ACLs during an outage.
|
||||
func (s *Server) aclLocalFault(id string) (string, string, error) {
|
||||
defer metrics.MeasureSince([]string{"consul", "acl", "fault"}, time.Now())
|
||||
|
||||
// Query the state store.
|
||||
state := s.fsm.State()
|
||||
_, acl, err := state.ACLGet(id)
|
||||
if err != nil {
|
||||
|
@ -64,19 +69,23 @@ func (s *Server) aclFault(id string) (string, string, error) {
|
|||
return "", "", errors.New(aclNotFound)
|
||||
}
|
||||
|
||||
// Management tokens have no policy and inherit from the
|
||||
// 'manage' root policy
|
||||
// Management tokens have no policy and inherit from the 'manage' root
|
||||
// policy.
|
||||
if acl.Type == structs.ACLTypeManagement {
|
||||
return "manage", "", nil
|
||||
}
|
||||
|
||||
// Otherwise use the base policy
|
||||
// Otherwise use the default policy.
|
||||
return s.config.ACLDefaultPolicy, acl.Rules, nil
|
||||
}
|
||||
|
||||
// resolveToken is used to resolve an ACL is any is appropriate
|
||||
// resolveToken is the primary interface used by ACL-checkers (such as an
|
||||
// endpoint handling a request) to resolve a token. If ACLs aren't enabled
|
||||
// then this will return a nil token, otherwise it will attempt to use local
|
||||
// cache and ultimately the ACL datacenter to get the policy associated with the
|
||||
// token.
|
||||
func (s *Server) resolveToken(id string) (acl.ACL, error) {
|
||||
// Check if there is no ACL datacenter (ACL's disabled)
|
||||
// Check if there is no ACL datacenter (ACLs disabled)
|
||||
authDC := s.config.ACLDatacenter
|
||||
if len(authDC) == 0 {
|
||||
return nil, nil
|
||||
|
@ -103,38 +112,45 @@ func (s *Server) resolveToken(id string) (acl.ACL, error) {
|
|||
// rpcFn is used to make an RPC call to the client or server.
|
||||
type rpcFn func(string, interface{}, interface{}) error
|
||||
|
||||
// aclCache is used to cache ACL's and policies.
|
||||
// aclCache is used to cache ACLs and policies.
|
||||
type aclCache struct {
|
||||
config *Config
|
||||
logger *log.Logger
|
||||
|
||||
// acls is a non-authoritative ACL cache
|
||||
acls *lru.Cache
|
||||
// acls is a non-authoritative ACL cache.
|
||||
acls *lru.TwoQueueCache
|
||||
|
||||
// aclPolicyCache is a policy cache
|
||||
policies *lru.Cache
|
||||
// aclPolicyCache is a non-authoritative policy cache.
|
||||
policies *lru.TwoQueueCache
|
||||
|
||||
// The RPC function used to talk to the client/server
|
||||
// rpc is a function used to talk to the client/server.
|
||||
rpc rpcFn
|
||||
|
||||
// local is a function used to look for an ACL locally if replication is
|
||||
// enabled. This will be nil if replication isn't enabled.
|
||||
local acl.FaultFunc
|
||||
}
|
||||
|
||||
// newAclCache returns a new cache layer for ACLs and policies
|
||||
func newAclCache(conf *Config, logger *log.Logger, rpc rpcFn) (*aclCache, error) {
|
||||
// newAclCache returns a new non-authoritative cache for ACLs. This is used for
|
||||
// performance, and is used inside the ACL datacenter on non-leader servers, and
|
||||
// outside the ACL datacenter everywhere.
|
||||
func newAclCache(conf *Config, logger *log.Logger, rpc rpcFn, local acl.FaultFunc) (*aclCache, error) {
|
||||
var err error
|
||||
cache := &aclCache{
|
||||
config: conf,
|
||||
logger: logger,
|
||||
rpc: rpc,
|
||||
local: local,
|
||||
}
|
||||
|
||||
// Initialize the non-authoritative ACL cache
|
||||
cache.acls, err = lru.New(aclCacheSize)
|
||||
cache.acls, err = lru.New2Q(aclCacheSize)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to create ACL cache: %v", err)
|
||||
}
|
||||
|
||||
// Initialize the ACL policy cache
|
||||
cache.policies, err = lru.New(aclCacheSize)
|
||||
cache.policies, err = lru.New2Q(aclCacheSize)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to create ACL policy cache: %v", err)
|
||||
}
|
||||
|
@ -142,17 +158,16 @@ func newAclCache(conf *Config, logger *log.Logger, rpc rpcFn) (*aclCache, error)
|
|||
return cache, nil
|
||||
}
|
||||
|
||||
// lookupACL is used when we are non-authoritative, and need
|
||||
// to resolve an ACL
|
||||
// lookupACL is used when we are non-authoritative, and need to resolve an ACL.
|
||||
func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
|
||||
// Check the cache for the ACL
|
||||
// Check the cache for the ACL.
|
||||
var cached *aclCacheEntry
|
||||
raw, ok := c.acls.Get(id)
|
||||
if ok {
|
||||
cached = raw.(*aclCacheEntry)
|
||||
}
|
||||
|
||||
// Check for live cache
|
||||
// Check for live cache.
|
||||
if cached != nil && time.Now().Before(cached.Expires) {
|
||||
metrics.IncrCounter([]string{"consul", "acl", "cache_hit"}, 1)
|
||||
return cached.ACL, nil
|
||||
|
@ -160,7 +175,7 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
|
|||
metrics.IncrCounter([]string{"consul", "acl", "cache_miss"}, 1)
|
||||
}
|
||||
|
||||
// Attempt to refresh the policy
|
||||
// Attempt to refresh the policy from the ACL datacenter via an RPC.
|
||||
args := structs.ACLPolicyRequest{
|
||||
Datacenter: authDC,
|
||||
ACL: id,
|
||||
|
@ -168,29 +183,69 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
|
|||
if cached != nil {
|
||||
args.ETag = cached.ETag
|
||||
}
|
||||
var out structs.ACLPolicy
|
||||
err := c.rpc("ACL.GetPolicy", &args, &out)
|
||||
|
||||
// Handle the happy path
|
||||
var reply structs.ACLPolicy
|
||||
err := c.rpc("ACL.GetPolicy", &args, &reply)
|
||||
if err == nil {
|
||||
return c.useACLPolicy(id, authDC, cached, &out)
|
||||
return c.useACLPolicy(id, authDC, cached, &reply)
|
||||
}
|
||||
|
||||
// Check for not-found
|
||||
// Check for not-found, which will cause us to bail immediately. For any
|
||||
// other error we report it in the logs but can continue.
|
||||
if strings.Contains(err.Error(), aclNotFound) {
|
||||
return nil, errors.New(aclNotFound)
|
||||
} else {
|
||||
s := id
|
||||
// Print last 3 chars of the token if long enough, otherwise completly hide it
|
||||
if len(s) > 3 {
|
||||
s = fmt.Sprintf("token ending in '%s'", s[len(s)-3:])
|
||||
} else {
|
||||
s = redactedToken
|
||||
}
|
||||
c.logger.Printf("[ERR] consul.acl: Failed to get policy for %s: %v", s, err)
|
||||
c.logger.Printf("[ERR] consul.acl: Failed to get policy from ACL datacenter: %v", err)
|
||||
}
|
||||
|
||||
// Unable to refresh, apply the down policy
|
||||
// TODO (slackpad) - We could do a similar thing *within* the ACL
|
||||
// datacenter if the leader isn't available. We have a local state
|
||||
// store of the ACLs, so by populating the local member in this cache,
|
||||
// it would fall back to the state store if there was a leader loss and
|
||||
// the extend-cache policy was true. This feels subtle to explain and
|
||||
// configure, and leader blips should be paved over by cache already, so
|
||||
// we won't do this for now but should consider for the future. This is
|
||||
// a lot different than the replication story where you might be cut off
|
||||
// from the ACL datacenter for an extended period of time and need to
|
||||
// carry on operating with the full set of ACLs as they were known
|
||||
// before the partition.
|
||||
|
||||
// At this point we might have an expired cache entry and we know that
|
||||
// there was a problem getting the ACL from the ACL datacenter. If a
|
||||
// local ACL fault function is registered to query replicated ACL data,
|
||||
// and the user's policy allows it, we will try locally before we give
|
||||
// up.
|
||||
if c.local != nil && c.config.ACLDownPolicy == "extend-cache" {
|
||||
parent, rules, err := c.local(id)
|
||||
if err != nil {
|
||||
// We don't make an exception here for ACLs that aren't
|
||||
// found locally. It seems more robust to use an expired
|
||||
// cached entry (if we have one) rather than ignore it
|
||||
// for the case that replication was a bit behind and
|
||||
// didn't have the ACL yet.
|
||||
c.logger.Printf("[DEBUG] consul.acl: Failed to get policy from replicated ACLs: %v", err)
|
||||
goto ACL_DOWN
|
||||
}
|
||||
|
||||
policy, err := acl.Parse(rules)
|
||||
if err != nil {
|
||||
c.logger.Printf("[DEBUG] consul.acl: Failed to parse policy for replicated ACL: %v", err)
|
||||
goto ACL_DOWN
|
||||
}
|
||||
policy.ID = acl.RuleID(rules)
|
||||
|
||||
// Fake up an ACL datacenter reply and inject it into the cache.
|
||||
// Note we use the local TTL here, so this'll be used for that
|
||||
// amount of time even once the ACL datacenter becomes available.
|
||||
metrics.IncrCounter([]string{"consul", "acl", "replication_hit"}, 1)
|
||||
reply.ETag = makeACLETag(parent, policy)
|
||||
reply.TTL = c.config.ACLTTL
|
||||
reply.Parent = parent
|
||||
reply.Policy = policy
|
||||
return c.useACLPolicy(id, authDC, cached, &reply)
|
||||
}
|
||||
|
||||
ACL_DOWN:
|
||||
// Unable to refresh, apply the down policy.
|
||||
switch c.config.ACLDownPolicy {
|
||||
case "allow":
|
||||
return acl.AllowAll(), nil
|
||||
|
|
|
@ -15,24 +15,15 @@ type ACL struct {
|
|||
srv *Server
|
||||
}
|
||||
|
||||
// Apply is used to apply a modifying request to the data store. This should
|
||||
// only be used for operations that modify the data
|
||||
func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
|
||||
if done, err := a.srv.forward("ACL.Apply", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"consul", "acl", "apply"}, time.Now())
|
||||
|
||||
// Verify we are allowed to serve this request
|
||||
if a.srv.config.ACLDatacenter != a.srv.config.Datacenter {
|
||||
return fmt.Errorf(aclDisabled)
|
||||
}
|
||||
|
||||
// Verify token is permitted to modify ACLs
|
||||
if acl, err := a.srv.resolveToken(args.Token); err != nil {
|
||||
return err
|
||||
} else if acl == nil || !acl.ACLModify() {
|
||||
return permissionDeniedErr
|
||||
// aclApplyInternal is used to apply an ACL request after it has been vetted that
|
||||
// this is a valid operation. It is used when users are updating ACLs, in which
|
||||
// case we check their token to make sure they have management privileges. It is
|
||||
// also used for ACL replication. We want to run the replicated ACLs through the
|
||||
// same checks on the change itself.
|
||||
func aclApplyInternal(srv *Server, args *structs.ACLRequest, reply *string) error {
|
||||
// All ACLs must have an ID by this point.
|
||||
if args.ACL.ID == "" {
|
||||
return fmt.Errorf("Missing ACL ID")
|
||||
}
|
||||
|
||||
switch args.Op {
|
||||
|
@ -56,33 +47,8 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
|
|||
return fmt.Errorf("ACL rule compilation failed: %v", err)
|
||||
}
|
||||
|
||||
// If no ID is provided, generate a new ID. This must
|
||||
// be done prior to appending to the raft log, because the ID is not
|
||||
// deterministic. Once the entry is in the log, the state update MUST
|
||||
// be deterministic or the followers will not converge.
|
||||
if args.ACL.ID == "" {
|
||||
state := a.srv.fsm.State()
|
||||
for {
|
||||
if args.ACL.ID, err = uuid.GenerateUUID(); err != nil {
|
||||
a.srv.logger.Printf("[ERR] consul.acl: UUID generation failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
_, acl, err := state.ACLGet(args.ACL.ID)
|
||||
if err != nil {
|
||||
a.srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err)
|
||||
return err
|
||||
}
|
||||
if acl == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case structs.ACLDelete:
|
||||
if args.ACL.ID == "" {
|
||||
return fmt.Errorf("Missing ACL ID")
|
||||
} else if args.ACL.ID == anonymousToken {
|
||||
if args.ACL.ID == anonymousToken {
|
||||
return fmt.Errorf("%s: Cannot delete anonymous token", permissionDenied)
|
||||
}
|
||||
|
||||
|
@ -91,24 +57,78 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
|
|||
}
|
||||
|
||||
// Apply the update
|
||||
resp, err := a.srv.raftApply(structs.ACLRequestType, args)
|
||||
resp, err := srv.raftApply(structs.ACLRequestType, args)
|
||||
if err != nil {
|
||||
a.srv.logger.Printf("[ERR] consul.acl: Apply failed: %v", err)
|
||||
srv.logger.Printf("[ERR] consul.acl: Apply failed: %v", err)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
// Check if the return type is a string
|
||||
if respString, ok := resp.(string); ok {
|
||||
*reply = respString
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply is used to apply a modifying request to the data store. This should
|
||||
// only be used for operations that modify the data
|
||||
func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
|
||||
if done, err := a.srv.forward("ACL.Apply", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"consul", "acl", "apply"}, time.Now())
|
||||
|
||||
// Verify we are allowed to serve this request
|
||||
if a.srv.config.ACLDatacenter != a.srv.config.Datacenter {
|
||||
return fmt.Errorf(aclDisabled)
|
||||
}
|
||||
|
||||
// Verify token is permitted to modify ACLs
|
||||
if acl, err := a.srv.resolveToken(args.Token); err != nil {
|
||||
return err
|
||||
} else if acl == nil || !acl.ACLModify() {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
|
||||
// If no ID is provided, generate a new ID. This must be done prior to
|
||||
// appending to the Raft log, because the ID is not deterministic. Once
|
||||
// the entry is in the log, the state update MUST be deterministic or
|
||||
// the followers will not converge.
|
||||
if args.Op == structs.ACLSet && args.ACL.ID == "" {
|
||||
state := a.srv.fsm.State()
|
||||
for {
|
||||
var err error
|
||||
args.ACL.ID, err = uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
a.srv.logger.Printf("[ERR] consul.acl: UUID generation failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
_, acl, err := state.ACLGet(args.ACL.ID)
|
||||
if err != nil {
|
||||
a.srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err)
|
||||
return err
|
||||
}
|
||||
if acl == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Do the apply now that this update is vetted.
|
||||
if err := aclApplyInternal(a.srv, args, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Clear the cache if applicable
|
||||
if args.ACL.ID != "" {
|
||||
a.srv.aclAuthCache.ClearACL(args.ACL.ID)
|
||||
}
|
||||
|
||||
// Check if the return type is a string
|
||||
if respString, ok := resp.(string); ok {
|
||||
*reply = respString
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -145,6 +165,11 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
|
|||
})
|
||||
}
|
||||
|
||||
// makeACLETag returns an ETag for the given parent and policy.
|
||||
func makeACLETag(parent string, policy *acl.Policy) string {
|
||||
return fmt.Sprintf("%s:%s", parent, policy.ID)
|
||||
}
|
||||
|
||||
// GetPolicy is used to retrieve a compiled policy object with a TTL. Does not
|
||||
// support a blocking query.
|
||||
func (a *ACL) GetPolicy(args *structs.ACLPolicyRequest, reply *structs.ACLPolicy) error {
|
||||
|
@ -165,7 +190,7 @@ func (a *ACL) GetPolicy(args *structs.ACLPolicyRequest, reply *structs.ACLPolicy
|
|||
|
||||
// Generate an ETag
|
||||
conf := a.srv.config
|
||||
etag := fmt.Sprintf("%s:%s", parent, policy.ID)
|
||||
etag := makeACLETag(parent, policy)
|
||||
|
||||
// Setup the response
|
||||
reply.ETag = etag
|
||||
|
@ -214,3 +239,25 @@ func (a *ACL) List(args *structs.DCSpecificRequest,
|
|||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ReplicationStatus is used to retrieve the current ACL replication status.
|
||||
func (a *ACL) ReplicationStatus(args *structs.DCSpecificRequest,
|
||||
reply *structs.ACLReplicationStatus) error {
|
||||
// This must be sent to the leader, so we fix the args since we are
|
||||
// re-using a structure where we don't support all the options.
|
||||
args.RequireConsistent = true
|
||||
args.AllowStale = false
|
||||
if done, err := a.srv.forward("ACL.ReplicationStatus", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// There's no ACL token required here since this doesn't leak any
|
||||
// sensitive information, and we don't want people to have to use
|
||||
// management tokens if they are querying this via a health check.
|
||||
|
||||
// Poll the latest status.
|
||||
a.srv.aclReplicationStatusLock.RLock()
|
||||
*reply = a.srv.aclReplicationStatus
|
||||
a.srv.aclReplicationStatusLock.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -466,3 +466,29 @@ func TestACLEndpoint_List_Denied(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLEndpoint_ReplicationStatus(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc2"
|
||||
c.ACLReplicationToken = "secret"
|
||||
c.ACLReplicationInterval = 0
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
getR := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var status structs.ACLReplicationStatus
|
||||
err := msgpackrpc.CallWithCodec(codec, "ACL.ReplicationStatus", &getR, &status)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !status.Enabled || !status.Running || status.SourceDatacenter != "dc2" {
|
||||
t.Fatalf("bad: %#v", status)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,348 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
)
|
||||
|
||||
// aclIterator simplifies the algorithm below by providing a basic iterator that
|
||||
// moves through a list of ACLs and returns nil when it's exhausted. It also has
|
||||
// methods for pre-sorting the ACLs being iterated over by ID, which should
|
||||
// already be true, but since this is crucial for correctness and we are taking
|
||||
// input from other servers, we sort to make sure.
|
||||
type aclIterator struct {
|
||||
acls structs.ACLs
|
||||
|
||||
// index is the current position of the iterator.
|
||||
index int
|
||||
}
|
||||
|
||||
// newACLIterator returns a new ACL iterator.
|
||||
func newACLIterator(acls structs.ACLs) *aclIterator {
|
||||
return &aclIterator{acls: acls}
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (a *aclIterator) Len() int {
|
||||
return len(a.acls)
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (a *aclIterator) Swap(i, j int) {
|
||||
a.acls[i], a.acls[j] = a.acls[j], a.acls[i]
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (a *aclIterator) Less(i, j int) bool {
|
||||
return a.acls[i].ID < a.acls[j].ID
|
||||
}
|
||||
|
||||
// Front returns the item at index position, or nil if the list is exhausted.
|
||||
func (a *aclIterator) Front() *structs.ACL {
|
||||
if a.index < len(a.acls) {
|
||||
return a.acls[a.index]
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Next advances the iterator to the next index.
|
||||
func (a *aclIterator) Next() {
|
||||
a.index++
|
||||
}
|
||||
|
||||
// reconcileACLs takes the local and remote ACL state, and produces a list of
|
||||
// changes required in order to bring the local ACLs into sync with the remote
|
||||
// ACLs. You can supply lastRemoteIndex as a hint that replication has succeeded
|
||||
// up to that remote index and it will make this process more efficient by only
|
||||
// comparing ACL entries modified after that index. Setting this to 0 will force
|
||||
// a full compare of all existing ACLs.
|
||||
func reconcileACLs(local, remote structs.ACLs, lastRemoteIndex uint64) structs.ACLRequests {
|
||||
// Since sorting the lists is crucial for correctness, we are depending
|
||||
// on data coming from other servers potentially running a different,
|
||||
// version of Consul, and sorted-ness is kind of a subtle property of
|
||||
// the state store indexing, it's prudent to make sure things are sorted
|
||||
// before we begin.
|
||||
localIter, remoteIter := newACLIterator(local), newACLIterator(remote)
|
||||
sort.Sort(localIter)
|
||||
sort.Sort(remoteIter)
|
||||
|
||||
// Run through both lists and reconcile them.
|
||||
var changes structs.ACLRequests
|
||||
for localIter.Front() != nil || remoteIter.Front() != nil {
|
||||
// If the local list is exhausted, then process this as a remote
|
||||
// add. We know from the loop condition that there's something
|
||||
// in the remote list.
|
||||
if localIter.Front() == nil {
|
||||
changes = append(changes, &structs.ACLRequest{
|
||||
Op: structs.ACLSet,
|
||||
ACL: *(remoteIter.Front()),
|
||||
})
|
||||
remoteIter.Next()
|
||||
continue
|
||||
}
|
||||
|
||||
// If the remote list is exhausted, then process this as a local
|
||||
// delete. We know from the loop condition that there's something
|
||||
// in the local list.
|
||||
if remoteIter.Front() == nil {
|
||||
changes = append(changes, &structs.ACLRequest{
|
||||
Op: structs.ACLDelete,
|
||||
ACL: *(localIter.Front()),
|
||||
})
|
||||
localIter.Next()
|
||||
continue
|
||||
}
|
||||
|
||||
// At this point we know there's something at the front of each
|
||||
// list we need to resolve.
|
||||
|
||||
// If the remote list has something local doesn't, we add it.
|
||||
if localIter.Front().ID > remoteIter.Front().ID {
|
||||
changes = append(changes, &structs.ACLRequest{
|
||||
Op: structs.ACLSet,
|
||||
ACL: *(remoteIter.Front()),
|
||||
})
|
||||
remoteIter.Next()
|
||||
continue
|
||||
}
|
||||
|
||||
// If local has something remote doesn't, we delete it.
|
||||
if localIter.Front().ID < remoteIter.Front().ID {
|
||||
changes = append(changes, &structs.ACLRequest{
|
||||
Op: structs.ACLDelete,
|
||||
ACL: *(localIter.Front()),
|
||||
})
|
||||
localIter.Next()
|
||||
continue
|
||||
}
|
||||
|
||||
// Local and remote have an ACL with the same ID, so we might
|
||||
// need to compare them.
|
||||
l, r := localIter.Front(), remoteIter.Front()
|
||||
if r.RaftIndex.ModifyIndex > lastRemoteIndex && !r.IsSame(l) {
|
||||
changes = append(changes, &structs.ACLRequest{
|
||||
Op: structs.ACLSet,
|
||||
ACL: *r,
|
||||
})
|
||||
}
|
||||
localIter.Next()
|
||||
remoteIter.Next()
|
||||
}
|
||||
return changes
|
||||
}
|
||||
|
||||
// FetchLocalACLs returns the ACLs in the local state store.
|
||||
func (s *Server) fetchLocalACLs() (structs.ACLs, error) {
|
||||
_, local, err := s.fsm.State().ACLList()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return local, nil
|
||||
}
|
||||
|
||||
// FetchRemoteACLs is used to get the remote set of ACLs from the ACL
|
||||
// datacenter. The lastIndex parameter is a hint about which remote index we
|
||||
// have replicated to, so this is expected to block until something changes.
|
||||
func (s *Server) fetchRemoteACLs(lastRemoteIndex uint64) (*structs.IndexedACLs, error) {
|
||||
defer metrics.MeasureSince([]string{"consul", "leader", "fetchRemoteACLs"}, time.Now())
|
||||
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: s.config.ACLDatacenter,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: s.config.ACLReplicationToken,
|
||||
MinQueryIndex: lastRemoteIndex,
|
||||
AllowStale: true,
|
||||
},
|
||||
}
|
||||
var remote structs.IndexedACLs
|
||||
if err := s.RPC("ACL.List", &args, &remote); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &remote, nil
|
||||
}
|
||||
|
||||
// UpdateLocalACLs is given a list of changes to apply in order to bring the
|
||||
// local ACLs in-line with the remote ACLs from the ACL datacenter.
|
||||
func (s *Server) updateLocalACLs(changes structs.ACLRequests) error {
|
||||
defer metrics.MeasureSince([]string{"consul", "leader", "updateLocalACLs"}, time.Now())
|
||||
|
||||
minTimePerOp := time.Second / time.Duration(s.config.ACLReplicationApplyLimit)
|
||||
for _, change := range changes {
|
||||
// Note that we are using the single ACL interface here and not
|
||||
// performing all this inside a single transaction. This is OK
|
||||
// for two reasons. First, there's nothing else other than this
|
||||
// replication routine that alters the local ACLs, so there's
|
||||
// nothing to contend with locally. Second, if an apply fails
|
||||
// in the middle (most likely due to losing leadership), the
|
||||
// next replication pass will clean up and check everything
|
||||
// again.
|
||||
var reply string
|
||||
start := time.Now()
|
||||
if err := aclApplyInternal(s, change, &reply); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Do a smooth rate limit to wait out the min time allowed for
|
||||
// each op. If this op took longer than the min, then the sleep
|
||||
// time will be negative and we will just move on.
|
||||
elapsed := time.Now().Sub(start)
|
||||
time.Sleep(minTimePerOp - elapsed)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// replicateACLs is a runs one pass of the algorithm for replicating ACLs from
|
||||
// a remote ACL datacenter to local state. If there's any error, this will return
|
||||
// 0 for the lastRemoteIndex, which will cause us to immediately do a full sync
|
||||
// next time.
|
||||
func (s *Server) replicateACLs(lastRemoteIndex uint64) (uint64, error) {
|
||||
remote, err := s.fetchRemoteACLs(lastRemoteIndex)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to retrieve remote ACLs: %v", err)
|
||||
}
|
||||
|
||||
// This will be pretty common because we will be blocking for a long time
|
||||
// and may have lost leadership, so lets control the message here instead
|
||||
// of returning deeper error messages from from Raft.
|
||||
if !s.IsLeader() {
|
||||
return 0, fmt.Errorf("no longer cluster leader")
|
||||
}
|
||||
|
||||
// Measure everything after the remote query, which can block for long
|
||||
// periods of time. This metric is a good measure of how expensive the
|
||||
// replication process is.
|
||||
defer metrics.MeasureSince([]string{"consul", "leader", "replicateACLs"}, time.Now())
|
||||
|
||||
local, err := s.fetchLocalACLs()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to retrieve local ACLs: %v", err)
|
||||
}
|
||||
|
||||
// If the remote index ever goes backwards, it's a good indication that
|
||||
// the remote side was rebuilt and we should do a full sync since we
|
||||
// can't make any assumptions about what's going on.
|
||||
if remote.QueryMeta.Index < lastRemoteIndex {
|
||||
s.logger.Printf("[WARN] consul: ACL replication remote index moved backwards (%d to %d), forcing a full ACL sync", lastRemoteIndex, remote.QueryMeta.Index)
|
||||
lastRemoteIndex = 0
|
||||
}
|
||||
|
||||
// Calculate the changes required to bring the state into sync and then
|
||||
// apply them.
|
||||
changes := reconcileACLs(local, remote.ACLs, lastRemoteIndex)
|
||||
if err := s.updateLocalACLs(changes); err != nil {
|
||||
return 0, fmt.Errorf("failed to sync ACL changes: %v", err)
|
||||
}
|
||||
|
||||
// Return the index we got back from the remote side, since we've synced
|
||||
// up with the remote state as of that index.
|
||||
return remote.QueryMeta.Index, nil
|
||||
}
|
||||
|
||||
// IsACLReplicationEnabled returns true if ACL replication is enabled.
|
||||
func (s *Server) IsACLReplicationEnabled() bool {
|
||||
authDC := s.config.ACLDatacenter
|
||||
return len(authDC) > 0 && (authDC != s.config.Datacenter) &&
|
||||
len(s.config.ACLReplicationToken) > 0
|
||||
}
|
||||
|
||||
// updateACLReplicationStatus safely updates the ACL replication status.
|
||||
func (s *Server) updateACLReplicationStatus(status structs.ACLReplicationStatus) {
|
||||
// Fixup the times to shed some useless precision to ease formattting,
|
||||
// and always report UTC.
|
||||
status.LastError = status.LastError.Round(time.Second).UTC()
|
||||
status.LastSuccess = status.LastSuccess.Round(time.Second).UTC()
|
||||
|
||||
// Set the shared state.
|
||||
s.aclReplicationStatusLock.Lock()
|
||||
s.aclReplicationStatus = status
|
||||
s.aclReplicationStatusLock.Unlock()
|
||||
}
|
||||
|
||||
// runACLReplication is a long-running goroutine that will attempt to replicate
|
||||
// ACLs while the server is the leader, until the shutdown channel closes.
|
||||
func (s *Server) runACLReplication() {
|
||||
var status structs.ACLReplicationStatus
|
||||
status.Enabled = true
|
||||
status.SourceDatacenter = s.config.ACLDatacenter
|
||||
s.updateACLReplicationStatus(status)
|
||||
|
||||
// Show that it's not running on the way out.
|
||||
defer func() {
|
||||
status.Running = false
|
||||
s.updateACLReplicationStatus(status)
|
||||
}()
|
||||
|
||||
// Give each server's replicator a random initial phase for good
|
||||
// measure.
|
||||
select {
|
||||
case <-s.shutdownCh:
|
||||
return
|
||||
|
||||
case <-time.After(lib.RandomStagger(s.config.ACLReplicationInterval)):
|
||||
}
|
||||
|
||||
// We are fairly conservative with the lastRemoteIndex so that after a
|
||||
// leadership change or an error we re-sync everything (we also don't
|
||||
// want to block the first time after one of these events so we can
|
||||
// show a successful sync in the status endpoint).
|
||||
var lastRemoteIndex uint64
|
||||
replicate := func() {
|
||||
if !status.Running {
|
||||
lastRemoteIndex = 0 // Re-sync everything.
|
||||
status.Running = true
|
||||
s.updateACLReplicationStatus(status)
|
||||
s.logger.Printf("[INFO] consul: ACL replication started")
|
||||
}
|
||||
|
||||
index, err := s.replicateACLs(lastRemoteIndex)
|
||||
if err != nil {
|
||||
lastRemoteIndex = 0 // Re-sync everything.
|
||||
status.LastError = time.Now()
|
||||
s.updateACLReplicationStatus(status)
|
||||
s.logger.Printf("[WARN] consul: ACL replication error (will retry if still leader): %v", err)
|
||||
} else {
|
||||
lastRemoteIndex = index
|
||||
status.ReplicatedIndex = index
|
||||
status.LastSuccess = time.Now()
|
||||
s.updateACLReplicationStatus(status)
|
||||
s.logger.Printf("[DEBUG] consul: ACL replication completed through remote index %d", index)
|
||||
}
|
||||
}
|
||||
pause := func() {
|
||||
if status.Running {
|
||||
lastRemoteIndex = 0 // Re-sync everything.
|
||||
status.Running = false
|
||||
s.updateACLReplicationStatus(status)
|
||||
s.logger.Printf("[INFO] consul: ACL replication stopped (no longer leader)")
|
||||
}
|
||||
}
|
||||
|
||||
// This will slowly poll to see if replication should be active. Once it
|
||||
// is and we've caught up, the replicate() call will begin to block and
|
||||
// only wake up when the query timer expires or there are new ACLs to
|
||||
// replicate. We've chosen this design so that the ACLReplicationInterval
|
||||
// is the lower bound for how quickly we will replicate, no matter how
|
||||
// much ACL churn is happening on the remote side.
|
||||
//
|
||||
// The blocking query inside replicate() respects the shutdown channel,
|
||||
// so we won't get stuck in here as things are torn down.
|
||||
for {
|
||||
select {
|
||||
case <-s.shutdownCh:
|
||||
return
|
||||
|
||||
case <-time.After(s.config.ACLReplicationInterval):
|
||||
if s.IsLeader() {
|
||||
replicate()
|
||||
} else {
|
||||
pause()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,443 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
)
|
||||
|
||||
func TestACLReplication_Sorter(t *testing.T) {
|
||||
acls := structs.ACLs{
|
||||
&structs.ACL{ID: "a"},
|
||||
&structs.ACL{ID: "b"},
|
||||
&structs.ACL{ID: "c"},
|
||||
}
|
||||
|
||||
sorter := &aclIterator{acls, 0}
|
||||
if len := sorter.Len(); len != 3 {
|
||||
t.Fatalf("bad: %d", len)
|
||||
}
|
||||
if !sorter.Less(0, 1) {
|
||||
t.Fatalf("should be less")
|
||||
}
|
||||
if sorter.Less(1, 0) {
|
||||
t.Fatalf("should not be less")
|
||||
}
|
||||
if !sort.IsSorted(sorter) {
|
||||
t.Fatalf("should be sorted")
|
||||
}
|
||||
|
||||
expected := structs.ACLs{
|
||||
&structs.ACL{ID: "b"},
|
||||
&structs.ACL{ID: "a"},
|
||||
&structs.ACL{ID: "c"},
|
||||
}
|
||||
sorter.Swap(0, 1)
|
||||
if !reflect.DeepEqual(acls, expected) {
|
||||
t.Fatalf("bad: %v", acls)
|
||||
}
|
||||
if sort.IsSorted(sorter) {
|
||||
t.Fatalf("should not be sorted")
|
||||
}
|
||||
sort.Sort(sorter)
|
||||
if !sort.IsSorted(sorter) {
|
||||
t.Fatalf("should be sorted")
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_Iterator(t *testing.T) {
|
||||
acls := structs.ACLs{}
|
||||
|
||||
iter := newACLIterator(acls)
|
||||
if front := iter.Front(); front != nil {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
iter.Next()
|
||||
if front := iter.Front(); front != nil {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
|
||||
acls = structs.ACLs{
|
||||
&structs.ACL{ID: "a"},
|
||||
&structs.ACL{ID: "b"},
|
||||
&structs.ACL{ID: "c"},
|
||||
}
|
||||
iter = newACLIterator(acls)
|
||||
if front := iter.Front(); front != acls[0] {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
iter.Next()
|
||||
if front := iter.Front(); front != acls[1] {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
iter.Next()
|
||||
if front := iter.Front(); front != acls[2] {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
iter.Next()
|
||||
if front := iter.Front(); front != nil {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_reconcileACLs(t *testing.T) {
|
||||
parseACLs := func(raw string) structs.ACLs {
|
||||
var acls structs.ACLs
|
||||
for _, key := range strings.Split(raw, "|") {
|
||||
if len(key) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
tuple := strings.Split(key, ":")
|
||||
index, err := strconv.Atoi(tuple[1])
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
acl := &structs.ACL{
|
||||
ID: tuple[0],
|
||||
Rules: tuple[2],
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: uint64(index),
|
||||
},
|
||||
}
|
||||
acls = append(acls, acl)
|
||||
}
|
||||
return acls
|
||||
}
|
||||
|
||||
parseChanges := func(changes structs.ACLRequests) string {
|
||||
var ret string
|
||||
for i, change := range changes {
|
||||
if i > 0 {
|
||||
ret += "|"
|
||||
}
|
||||
ret += fmt.Sprintf("%s:%s:%s", change.Op, change.ACL.ID, change.ACL.Rules)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
local string
|
||||
remote string
|
||||
lastRemoteIndex uint64
|
||||
expected string
|
||||
}{
|
||||
// Everything empty.
|
||||
{
|
||||
local: "",
|
||||
remote: "",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "",
|
||||
},
|
||||
// First time with empty local.
|
||||
{
|
||||
local: "",
|
||||
remote: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "set:bbb:X|set:ccc:X|set:ddd:X|set:eee:X",
|
||||
},
|
||||
// Remote not sorted.
|
||||
{
|
||||
local: "",
|
||||
remote: "ddd:2:X|bbb:3:X|ccc:9:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "set:bbb:X|set:ccc:X|set:ddd:X|set:eee:X",
|
||||
},
|
||||
// Neither side sorted.
|
||||
{
|
||||
local: "ddd:2:X|bbb:3:X|ccc:9:X|eee:11:X",
|
||||
remote: "ccc:9:X|bbb:3:X|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "",
|
||||
},
|
||||
// Fully replicated, nothing to do.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "",
|
||||
},
|
||||
// Change an ACL.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "bbb:3:X|ccc:33:Y|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "set:ccc:Y",
|
||||
},
|
||||
// Change an ACL, but mask the change by the last replicated
|
||||
// index. This isn't how things work normally, but it proves
|
||||
// we are skipping the full compare based on the index.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "bbb:3:X|ccc:33:Y|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 33,
|
||||
expected: "",
|
||||
},
|
||||
// Empty everything out.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "delete:bbb:X|delete:ccc:X|delete:ddd:X|delete:eee:X",
|
||||
},
|
||||
// Adds on the ends and in the middle.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "aaa:99:X|bbb:3:X|ccc:9:X|ccx:101:X|ddd:2:X|eee:11:X|fff:102:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "set:aaa:X|set:ccx:X|set:fff:X",
|
||||
},
|
||||
// Deletes on the ends and in the middle.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "ccc:9:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "delete:bbb:X|delete:ddd:X|delete:eee:X",
|
||||
},
|
||||
// Everything.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "aaa:99:X|bbb:3:X|ccx:101:X|ddd:103:Y|eee:11:X|fff:102:X",
|
||||
lastRemoteIndex: 11,
|
||||
expected: "set:aaa:X|delete:ccc:X|set:ccx:X|set:ddd:Y|set:fff:X",
|
||||
},
|
||||
}
|
||||
for i, test := range tests {
|
||||
local, remote := parseACLs(test.local), parseACLs(test.remote)
|
||||
changes := reconcileACLs(local, remote, test.lastRemoteIndex)
|
||||
if actual := parseChanges(changes); actual != test.expected {
|
||||
t.Errorf("test case %d failed: %s", i, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_updateLocalACLs_RateLimit(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLReplicationToken = "secret"
|
||||
c.ACLReplicationApplyLimit = 1
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
changes := structs.ACLRequests{
|
||||
&structs.ACLRequest{
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
ID: "secret",
|
||||
Type: "client",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Should be throttled to 1 Hz.
|
||||
start := time.Now()
|
||||
if err := s1.updateLocalACLs(changes); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if dur := time.Now().Sub(start); dur < time.Second {
|
||||
t.Fatalf("too slow: %9.6f", dur.Seconds())
|
||||
}
|
||||
|
||||
changes = append(changes,
|
||||
&structs.ACLRequest{
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
ID: "secret",
|
||||
Type: "client",
|
||||
},
|
||||
})
|
||||
|
||||
// Should be throttled to 1 Hz.
|
||||
start = time.Now()
|
||||
if err := s1.updateLocalACLs(changes); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if dur := time.Now().Sub(start); dur < 2*time.Second {
|
||||
t.Fatalf("too fast: %9.6f", dur.Seconds())
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_IsACLReplicationEnabled(t *testing.T) {
|
||||
// ACLs not enabled.
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = ""
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
if s1.IsACLReplicationEnabled() {
|
||||
t.Fatalf("should not be enabled")
|
||||
}
|
||||
|
||||
// ACLs enabled but not replication.
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.ACLDatacenter = "dc1"
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
if s2.IsACLReplicationEnabled() {
|
||||
t.Fatalf("should not be enabled")
|
||||
}
|
||||
|
||||
// ACLs enabled with replication.
|
||||
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLReplicationToken = "secret"
|
||||
})
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
if !s3.IsACLReplicationEnabled() {
|
||||
t.Fatalf("should be enabled")
|
||||
}
|
||||
|
||||
// ACLs enabled and replication token set, but inside the ACL datacenter
|
||||
// so replication should be disabled.
|
||||
dir4, s4 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLReplicationToken = "secret"
|
||||
})
|
||||
defer os.RemoveAll(dir4)
|
||||
defer s4.Shutdown()
|
||||
if s4.IsACLReplicationEnabled() {
|
||||
t.Fatalf("should not be enabled")
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLReplicationToken = "root"
|
||||
c.ACLReplicationInterval = 0
|
||||
c.ACLReplicationApplyLimit = 1000000
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Try to join.
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
s1.config.SerfWANConfig.MemberlistConfig.BindPort)
|
||||
if _, err := s2.JoinWAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
// Create a bunch of new tokens.
|
||||
var id string
|
||||
for i := 0; i < 1000; i++ {
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: testACLPolicy,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
checkSame := func() (bool, error) {
|
||||
index, remote, err := s1.fsm.State().ACLList()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
_, local, err := s2.fsm.State().ACLList()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(remote) != len(local) {
|
||||
return false, nil
|
||||
}
|
||||
for i, acl := range remote {
|
||||
if !acl.IsSame(local[i]) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
var status structs.ACLReplicationStatus
|
||||
s2.aclReplicationStatusLock.RLock()
|
||||
status = s2.aclReplicationStatus
|
||||
s2.aclReplicationStatusLock.RUnlock()
|
||||
if !status.Enabled || !status.Running ||
|
||||
status.ReplicatedIndex != index ||
|
||||
status.SourceDatacenter != "dc1" {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Wait for the replica to converge.
|
||||
testutil.WaitForResult(checkSame, func(err error) {
|
||||
t.Fatalf("ACLs didn't converge")
|
||||
})
|
||||
|
||||
// Create more new tokens.
|
||||
for i := 0; i < 1000; i++ {
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: testACLPolicy,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var dontCare string
|
||||
if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the replica to converge.
|
||||
testutil.WaitForResult(checkSame, func(err error) {
|
||||
t.Fatalf("ACLs didn't converge")
|
||||
})
|
||||
|
||||
// Delete a token.
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLDelete,
|
||||
ACL: structs.ACL{
|
||||
ID: id,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var dontCare string
|
||||
if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Wait for the replica to converge.
|
||||
testutil.WaitForResult(checkSame, func(err error) {
|
||||
t.Fatalf("ACLs didn't converge")
|
||||
})
|
||||
}
|
|
@ -614,6 +614,128 @@ func TestACL_DownPolicy_ExtendCache(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestACL_Replication(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
c.ACLDownPolicy = "extend-cache"
|
||||
c.ACLReplicationToken = "root"
|
||||
c.ACLReplicationInterval = 0
|
||||
c.ACLReplicationApplyLimit = 1000000
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc3"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLDownPolicy = "deny"
|
||||
c.ACLReplicationToken = "root"
|
||||
c.ACLReplicationInterval = 0
|
||||
c.ACLReplicationApplyLimit = 1000000
|
||||
})
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
|
||||
// Try to join.
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
s1.config.SerfWANConfig.MemberlistConfig.BindPort)
|
||||
if _, err := s2.JoinWAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if _, err := s3.JoinWAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc2")
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc3")
|
||||
|
||||
// Create a new token.
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: testACLPolicy,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var id string
|
||||
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Wait for replication to occur.
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
_, acl, err := s2.fsm.State().ACLGet(id)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if acl == nil {
|
||||
return false, nil
|
||||
}
|
||||
_, acl, err = s3.fsm.State().ACLGet(id)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if acl == nil {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("ACLs didn't converge")
|
||||
})
|
||||
|
||||
// Kill the ACL datacenter.
|
||||
s1.Shutdown()
|
||||
|
||||
// Token should resolve on s2, which has replication + extend-cache.
|
||||
acl, err := s2.resolveToken(id)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if acl == nil {
|
||||
t.Fatalf("missing acl")
|
||||
}
|
||||
|
||||
// Check the policy
|
||||
if acl.KeyRead("bar") {
|
||||
t.Fatalf("unexpected read")
|
||||
}
|
||||
if !acl.KeyRead("foo/test") {
|
||||
t.Fatalf("unexpected failed read")
|
||||
}
|
||||
|
||||
// Although s3 has replication, and we verified that the ACL is there,
|
||||
// it can not be used because of the down policy.
|
||||
acl, err = s3.resolveToken(id)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if acl == nil {
|
||||
t.Fatalf("missing acl")
|
||||
}
|
||||
|
||||
// Check the policy.
|
||||
if acl.KeyRead("bar") {
|
||||
t.Fatalf("unexpected read")
|
||||
}
|
||||
if acl.KeyRead("foo/test") {
|
||||
t.Fatalf("unexpected read")
|
||||
}
|
||||
}
|
||||
|
||||
func TestACL_MultiDC_Found(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
|
|
|
@ -173,6 +173,24 @@ type Config struct {
|
|||
// "allow" can be used to allow all requests. This is not recommended.
|
||||
ACLDownPolicy string
|
||||
|
||||
// ACLReplicationToken is used to fetch ACLs from the ACLDatacenter in
|
||||
// order to replicate them locally. Setting this to a non-empty value
|
||||
// also enables replication. Replication is only available in datacenters
|
||||
// other than the ACLDatacenter.
|
||||
ACLReplicationToken string
|
||||
|
||||
// ACLReplicationInterval is the interval at which replication passes
|
||||
// will occur. Queries to the ACLDatacenter may block, so replication
|
||||
// can happen less often than this, but the interval forms the upper
|
||||
// limit to how fast we will go if there was constant ACL churn on the
|
||||
// remote end.
|
||||
ACLReplicationInterval time.Duration
|
||||
|
||||
// ACLReplicationApplyLimit is the max number of replication-related
|
||||
// apply operations that we allow during a one second period. This is
|
||||
// used to limit the amount of Raft bandwidth used for replication.
|
||||
ACLReplicationApplyLimit int
|
||||
|
||||
// TombstoneTTL is used to control how long KV tombstones are retained.
|
||||
// This provides a window of time where the X-Consul-Index is monotonic.
|
||||
// Outside this window, the index may not be monotonic. This is a result
|
||||
|
@ -271,21 +289,23 @@ func DefaultConfig() *Config {
|
|||
}
|
||||
|
||||
conf := &Config{
|
||||
Datacenter: DefaultDC,
|
||||
NodeName: hostname,
|
||||
RPCAddr: DefaultRPCAddr,
|
||||
RaftConfig: raft.DefaultConfig(),
|
||||
SerfLANConfig: serf.DefaultConfig(),
|
||||
SerfWANConfig: serf.DefaultConfig(),
|
||||
ReconcileInterval: 60 * time.Second,
|
||||
ProtocolVersion: ProtocolVersion2Compatible,
|
||||
ACLTTL: 30 * time.Second,
|
||||
ACLDefaultPolicy: "allow",
|
||||
ACLDownPolicy: "extend-cache",
|
||||
TombstoneTTL: 15 * time.Minute,
|
||||
TombstoneTTLGranularity: 30 * time.Second,
|
||||
SessionTTLMin: 10 * time.Second,
|
||||
DisableCoordinates: false,
|
||||
Datacenter: DefaultDC,
|
||||
NodeName: hostname,
|
||||
RPCAddr: DefaultRPCAddr,
|
||||
RaftConfig: raft.DefaultConfig(),
|
||||
SerfLANConfig: serf.DefaultConfig(),
|
||||
SerfWANConfig: serf.DefaultConfig(),
|
||||
ReconcileInterval: 60 * time.Second,
|
||||
ProtocolVersion: ProtocolVersion2Compatible,
|
||||
ACLTTL: 30 * time.Second,
|
||||
ACLDefaultPolicy: "allow",
|
||||
ACLDownPolicy: "extend-cache",
|
||||
ACLReplicationInterval: 30 * time.Second,
|
||||
ACLReplicationApplyLimit: 100, // ops / sec
|
||||
TombstoneTTL: 15 * time.Minute,
|
||||
TombstoneTTLGranularity: 30 * time.Second,
|
||||
SessionTTLMin: 10 * time.Second,
|
||||
DisableCoordinates: false,
|
||||
|
||||
// These are tuned to provide a total throughput of 128 updates
|
||||
// per second. If you update these, you should update the client-
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/consul/agent"
|
||||
"github.com/hashicorp/consul/consul/state"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/raft-boltdb"
|
||||
|
@ -67,7 +68,7 @@ const (
|
|||
// Server is Consul server which manages the service discovery,
|
||||
// health checking, DC forwarding, Raft, and multiple Serf pools.
|
||||
type Server struct {
|
||||
// aclAuthCache is the authoritative ACL cache
|
||||
// aclAuthCache is the authoritative ACL cache.
|
||||
aclAuthCache *acl.Cache
|
||||
|
||||
// aclCache is the non-authoritative ACL cache.
|
||||
|
@ -149,6 +150,14 @@ type Server struct {
|
|||
// for the KV tombstones
|
||||
tombstoneGC *state.TombstoneGC
|
||||
|
||||
// aclReplicationStatus (and its associated lock) provide information
|
||||
// about the health of the ACL replication goroutine.
|
||||
aclReplicationStatus structs.ACLReplicationStatus
|
||||
aclReplicationStatusLock sync.RWMutex
|
||||
|
||||
// shutdown and the associated members here are used in orchestrating
|
||||
// a clean shutdown. The shutdownCh is never written to, only closed to
|
||||
// indicate a shutdown has been initiated.
|
||||
shutdown bool
|
||||
shutdownCh chan struct{}
|
||||
shutdownLock sync.Mutex
|
||||
|
@ -171,49 +180,47 @@ type endpoints struct {
|
|||
// NewServer is used to construct a new Consul server from the
|
||||
// configuration, potentially returning an error
|
||||
func NewServer(config *Config) (*Server, error) {
|
||||
// Check the protocol version
|
||||
// Check the protocol version.
|
||||
if err := config.CheckVersion(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Check for a data directory!
|
||||
// Check for a data directory.
|
||||
if config.DataDir == "" && !config.DevMode {
|
||||
return nil, fmt.Errorf("Config must provide a DataDir")
|
||||
}
|
||||
|
||||
// Sanity check the ACLs
|
||||
// Sanity check the ACLs.
|
||||
if err := config.CheckACL(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Ensure we have a log output
|
||||
// Ensure we have a log output and create a logger.
|
||||
if config.LogOutput == nil {
|
||||
config.LogOutput = os.Stderr
|
||||
}
|
||||
logger := log.New(config.LogOutput, "", log.LstdFlags)
|
||||
|
||||
// Create the tls wrapper for outgoing connections
|
||||
// Create the TLS wrapper for outgoing connections.
|
||||
tlsConf := config.tlsConfig()
|
||||
tlsWrap, err := tlsConf.OutgoingTLSWrapper()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get the incoming tls config
|
||||
// Get the incoming TLS config.
|
||||
incomingTLS, err := tlsConf.IncomingTLSConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create a logger
|
||||
logger := log.New(config.LogOutput, "", log.LstdFlags)
|
||||
|
||||
// Create the tombstone GC
|
||||
// Create the tombstone GC.
|
||||
gc, err := state.NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create server
|
||||
// Create server.
|
||||
s := &Server{
|
||||
config: config,
|
||||
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
|
||||
|
@ -229,32 +236,37 @@ func NewServer(config *Config) (*Server, error) {
|
|||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Initialize the authoritative ACL cache
|
||||
s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclFault)
|
||||
// Initialize the authoritative ACL cache.
|
||||
s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault)
|
||||
if err != nil {
|
||||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to create ACL cache: %v", err)
|
||||
return nil, fmt.Errorf("Failed to create authoritative ACL cache: %v", err)
|
||||
}
|
||||
|
||||
// Set up the non-authoritative ACL cache
|
||||
if s.aclCache, err = newAclCache(config, logger, s.RPC); err != nil {
|
||||
// Set up the non-authoritative ACL cache. A nil local function is given
|
||||
// if ACL replication isn't enabled.
|
||||
var local acl.FaultFunc
|
||||
if s.IsACLReplicationEnabled() {
|
||||
local = s.aclLocalFault
|
||||
}
|
||||
if s.aclCache, err = newAclCache(config, logger, s.RPC, local); err != nil {
|
||||
s.Shutdown()
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("Failed to create non-authoritative ACL cache: %v", err)
|
||||
}
|
||||
|
||||
// Initialize the RPC layer
|
||||
// Initialize the RPC layer.
|
||||
if err := s.setupRPC(tlsWrap); err != nil {
|
||||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to start RPC layer: %v", err)
|
||||
}
|
||||
|
||||
// Initialize the Raft server
|
||||
// Initialize the Raft server.
|
||||
if err := s.setupRaft(); err != nil {
|
||||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to start Raft: %v", err)
|
||||
}
|
||||
|
||||
// Initialize the lan Serf
|
||||
// Initialize the LAN Serf.
|
||||
s.serfLAN, err = s.setupSerf(config.SerfLANConfig,
|
||||
s.eventChLAN, serfLANSnapshot, false)
|
||||
if err != nil {
|
||||
|
@ -263,7 +275,7 @@ func NewServer(config *Config) (*Server, error) {
|
|||
}
|
||||
go s.lanEventHandler()
|
||||
|
||||
// Initialize the wan Serf
|
||||
// Initialize the WAN Serf.
|
||||
s.serfWAN, err = s.setupSerf(config.SerfWANConfig,
|
||||
s.eventChWAN, serfWANSnapshot, true)
|
||||
if err != nil {
|
||||
|
@ -272,11 +284,17 @@ func NewServer(config *Config) (*Server, error) {
|
|||
}
|
||||
go s.wanEventHandler()
|
||||
|
||||
// Start listening for RPC requests
|
||||
// Start ACL replication.
|
||||
if s.IsACLReplicationEnabled() {
|
||||
go s.runACLReplication()
|
||||
}
|
||||
|
||||
// Start listening for RPC requests.
|
||||
go s.listen()
|
||||
|
||||
// Start the metrics handlers
|
||||
// Start the metrics handlers.
|
||||
go s.sessionStats()
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -662,7 +662,7 @@ type IndexedSessions struct {
|
|||
QueryMeta
|
||||
}
|
||||
|
||||
// ACL is used to represent a token and it's rules
|
||||
// ACL is used to represent a token and its rules
|
||||
type ACL struct {
|
||||
ID string
|
||||
Name string
|
||||
|
@ -681,6 +681,21 @@ const (
|
|||
ACLDelete = "delete"
|
||||
)
|
||||
|
||||
// IsSame checks if one ACL is the same as another, without looking
|
||||
// at the Raft information (that's why we didn't call it IsEqual). This is
|
||||
// useful for seeing if an update would be idempotent for all the functional
|
||||
// parts of the structure.
|
||||
func (a *ACL) IsSame(other *ACL) bool {
|
||||
if a.ID != other.ID ||
|
||||
a.Name != other.Name ||
|
||||
a.Type != other.Type ||
|
||||
a.Rules != other.Rules {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// ACLRequest is used to create, update or delete an ACL
|
||||
type ACLRequest struct {
|
||||
Datacenter string
|
||||
|
@ -693,6 +708,9 @@ func (r *ACLRequest) RequestDatacenter() string {
|
|||
return r.Datacenter
|
||||
}
|
||||
|
||||
// ACLRequests is a list of ACL change requests.
|
||||
type ACLRequests []*ACLRequest
|
||||
|
||||
// ACLSpecificRequest is used to request an ACL by ID
|
||||
type ACLSpecificRequest struct {
|
||||
Datacenter string
|
||||
|
@ -730,6 +748,17 @@ type ACLPolicy struct {
|
|||
QueryMeta
|
||||
}
|
||||
|
||||
// ACLReplicationStatus provides information about the health of the ACL
|
||||
// replication system.
|
||||
type ACLReplicationStatus struct {
|
||||
Enabled bool
|
||||
Running bool
|
||||
SourceDatacenter string
|
||||
ReplicatedIndex uint64
|
||||
LastSuccess time.Time
|
||||
LastError time.Time
|
||||
}
|
||||
|
||||
// Coordinate stores a node name with its associated network coordinate.
|
||||
type Coordinate struct {
|
||||
Node string
|
||||
|
|
|
@ -58,6 +58,53 @@ func TestStructs_Implements(t *testing.T) {
|
|||
)
|
||||
}
|
||||
|
||||
func TestStructs_ACL_IsSame(t *testing.T) {
|
||||
acl := &ACL{
|
||||
ID: "guid",
|
||||
Name: "An ACL for testing",
|
||||
Type: "client",
|
||||
Rules: "service \"\" { policy = \"read\" }",
|
||||
}
|
||||
if !acl.IsSame(acl) {
|
||||
t.Fatalf("should be equal to itself")
|
||||
}
|
||||
|
||||
other := &ACL{
|
||||
ID: "guid",
|
||||
Name: "An ACL for testing",
|
||||
Type: "client",
|
||||
Rules: "service \"\" { policy = \"read\" }",
|
||||
RaftIndex: RaftIndex{
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 2,
|
||||
},
|
||||
}
|
||||
if !acl.IsSame(other) || !other.IsSame(acl) {
|
||||
t.Fatalf("should not care about Raft fields")
|
||||
}
|
||||
|
||||
check := func(twiddle, restore func()) {
|
||||
if !acl.IsSame(other) || !other.IsSame(acl) {
|
||||
t.Fatalf("should be the same")
|
||||
}
|
||||
|
||||
twiddle()
|
||||
if acl.IsSame(other) || other.IsSame(acl) {
|
||||
t.Fatalf("should not be the same")
|
||||
}
|
||||
|
||||
restore()
|
||||
if !acl.IsSame(other) || !other.IsSame(acl) {
|
||||
t.Fatalf("should be the same")
|
||||
}
|
||||
}
|
||||
|
||||
check(func() { other.ID = "nope" }, func() { other.ID = "guid" })
|
||||
check(func() { other.Name = "nope" }, func() { other.Name = "An ACL for testing" })
|
||||
check(func() { other.Type = "management" }, func() { other.Type = "client" })
|
||||
check(func() { other.Rules = "" }, func() { other.Rules = "service \"\" { policy = \"read\" }" })
|
||||
}
|
||||
|
||||
// testServiceNode gives a fully filled out ServiceNode instance.
|
||||
func testServiceNode() *ServiceNode {
|
||||
return &ServiceNode{
|
||||
|
|
|
@ -17,6 +17,7 @@ The following endpoints are supported:
|
|||
* [`/v1/acl/info/<id>`](#acl_info): Queries the policy of a given token
|
||||
* [`/v1/acl/clone/<id>`](#acl_clone): Creates a new token by cloning an existing token
|
||||
* [`/v1/acl/list`](#acl_list): Lists all the active tokens
|
||||
* [`/v1/acl/replication`](#acl_replication_status): Checks status of ACL replication
|
||||
|
||||
### <a name="acl_create"></a> /v1/acl/create
|
||||
|
||||
|
@ -166,3 +167,56 @@ It returns a JSON body like this:
|
|||
...
|
||||
]
|
||||
```
|
||||
|
||||
### <a name="acl_replication_status"></a> /v1/acl/replication
|
||||
|
||||
The endpoint must be hit with a GET. It returns the status of the
|
||||
[ACL replication](/docs/internals/acl.html#replication) process in
|
||||
the datacenter. This is intended to be used by operators, or by
|
||||
automation checking the health of ACL replication.
|
||||
|
||||
By default, the datacenter of the agent is queried; however, the dc can be provided
|
||||
using the "?dc=" query parameter.
|
||||
|
||||
It returns a JSON body like this:
|
||||
|
||||
```javascript
|
||||
{
|
||||
"Enabled": true,
|
||||
"Running": true,
|
||||
"SourceDatacenter": "dc1",
|
||||
"ReplicatedIndex": 1976,
|
||||
"LastSuccess": "2016-08-05T06:28:58Z",
|
||||
"LastError": "2016-08-05T06:28:28Z"
|
||||
}
|
||||
```
|
||||
|
||||
`Enabled` reports whether ACL replication is enabled for the datacenter.
|
||||
|
||||
`Running` reports whether the ACL replication process is running. The process
|
||||
may take approximately 60 seconds to begin running after a leader election occurs.
|
||||
|
||||
`SourceDatacenter` is the authoritative ACL datacenter that ACLs are being
|
||||
replicated from, and will match the
|
||||
[`acl_datacenter`](/docs/agent/options.html#acl_datacenter) configuration.
|
||||
|
||||
`ReplicatedIndex` is the last index that was successfully replicated. You can
|
||||
compare this to the `X-Consul-Index` header returned by the [`/v1/acl/list`](#acl_list)
|
||||
endpoint to determine if the replication process has gotten all available
|
||||
ACLs. Note that replication runs as a background process approximately every 30
|
||||
seconds, and that local updates are rate limited to 100 update/second, so so it
|
||||
may take several minutes to perform the initial sync of a large set of ACLs.
|
||||
After the initial sync, replica lag should be on the order of about 30 seconds.
|
||||
|
||||
`LastSuccess` is the UTC time of the last successful sync operation. Note that
|
||||
since ACL replication is done with a blocking query, this may not update for up
|
||||
to 5 minutes if there have been no ACL changes to replicate. A zero value of
|
||||
"0001-01-01T00:00:00Z" will be present if no sync has been successful.
|
||||
|
||||
`LastError` is the UTC time of the last error encountered during a sync operation.
|
||||
If this time is later than `LastSuccess`, you can assume the replication process
|
||||
is not in a good state. A zero value of "0001-01-01T00:00:00Z" will be present if
|
||||
no sync has resulted in an error.
|
||||
|
||||
Please see the [ACL replication](/docs/internals/acl.html#replication)
|
||||
section of the internals guide for more details.
|
||||
|
|
|
@ -351,6 +351,17 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
|
|||
token. When you provide a value, it can be any string value. Using a UUID would ensure that it looks
|
||||
the same as the other tokens, but isn't strictly necessary.
|
||||
|
||||
* <a name="acl_replication_token"></a><a href="#acl_replication_token">`acl_replication_token`</a> -
|
||||
Only used for servers outside the [`acl_datacenter`](#acl_datacenter) running Consul 0.7 or later.
|
||||
When provided, this will enable [ACL replication](/docs/internals/acl.html#replication) using this
|
||||
token to retrieve and replicate the ACLs to the non-authoritative local datacenter.
|
||||
<br><br>
|
||||
If there's a partition or other outage affecting the authoritative datacenter, and the
|
||||
[`acl_down_policy`](/docs/agent/options.html#acl_down_policy) is set to "extend-cache", tokens not
|
||||
in the cache can be resolved during the outage using the replicated set of ACLs. Please see the
|
||||
[ACL replication](/docs/internals/acl.html#replication) section of the internals guide for more
|
||||
details.
|
||||
|
||||
* <a name="acl_token"></a><a href="#acl_token">`acl_token`</a> - When provided, the agent will use this
|
||||
token when making requests to the Consul servers. Clients can override this token on a per-request
|
||||
basis by providing the "?token" query parameter. When not provided, the empty token, which maps to
|
||||
|
|
|
@ -39,14 +39,24 @@ prior versions do not provide a token. This is handled by the special "anonymous
|
|||
token. If no token is provided, the rules associated with the anonymous token are
|
||||
automatically applied: this allows policy to be enforced on legacy clients.
|
||||
|
||||
ACLs can also act in either a whitelist or blacklist mode depending
|
||||
on the configuration of
|
||||
[`acl_default_policy`](/docs/agent/options.html#acl_default_policy). If the
|
||||
default policy is to deny all actions, then token rules can be set to whitelist
|
||||
specific actions. In the inverse, the allow all default behavior is a blacklist
|
||||
where rules are used to prohibit actions. By default, Consul will allow all
|
||||
actions.
|
||||
|
||||
#### ACL Datacenter
|
||||
|
||||
Enforcement is always done by the server nodes. All servers must be configured
|
||||
to provide an [`acl_datacenter`](/docs/agent/options.html#acl_datacenter) which
|
||||
enables ACL enforcement but also specifies the authoritative datacenter. Consul does not
|
||||
replicate data cross-WAN and instead relies on [RPC forwarding](/docs/internals/architecture.html)
|
||||
to support Multi-Datacenter configurations. However, because requests can be made
|
||||
enables ACL enforcement but also specifies the authoritative datacenter. Consul
|
||||
relies on [RPC forwarding](/docs/internals/architecture.html) to support
|
||||
Multi-Datacenter configurations. However, because requests can be made
|
||||
across datacenter boundaries, ACL tokens must be valid globally. To avoid
|
||||
replication issues, a single datacenter is considered authoritative and stores
|
||||
all the tokens.
|
||||
consistency issues, a single datacenter is considered authoritative and stores
|
||||
the canonical set of tokens.
|
||||
|
||||
When a request is made to a server in a non-authoritative datacenter server, it
|
||||
must be resolved into the appropriate policy. This is done by reading the token
|
||||
|
@ -55,7 +65,9 @@ from the authoritative server and caching the result for a configurable
|
|||
of caching is that the cache TTL is an upper bound on the staleness of policy
|
||||
that is enforced. It is possible to set a zero TTL, but this has adverse
|
||||
performance impacts, as every request requires refreshing the policy via a
|
||||
cross-datacenter WAN call.
|
||||
cross-datacenter WAN RPC call.
|
||||
|
||||
#### Outages and ACL Replication
|
||||
|
||||
The Consul ACL system is designed with flexible rules to accommodate for an outage
|
||||
of the [`acl_datacenter`](/docs/agent/options.html#acl_datacenter) or networking
|
||||
|
@ -66,114 +78,46 @@ choices to tune behavior. It is possible to deny or permit all actions or to ign
|
|||
cache TTLs and enter a fail-safe mode. The default is to ignore cache TTLs
|
||||
for any previously resolved tokens and to deny any uncached tokens.
|
||||
|
||||
ACLs can also act in either a whitelist or blacklist mode depending
|
||||
on the configuration of
|
||||
[`acl_default_policy`](/docs/agent/options.html#acl_default_policy). If the
|
||||
default policy is to deny all actions, then token rules can be set to whitelist
|
||||
specific actions. In the inverse, the allow all default behavior is a blacklist
|
||||
where rules are used to prohibit actions. By default, Consul will allow all
|
||||
actions.
|
||||
<a name="replication"></a>
|
||||
Consul 0.7 added an ACL Replication capability that can allow non-authoritative
|
||||
datacenter servers to resolve even uncached tokens. This is enabled by setting an
|
||||
[`acl_replication_token`](/docs/agent/options.html#acl_replication_token) in the
|
||||
configuration on the servers in the non-authoritative datacenters. With replication
|
||||
enabled, the servers will maintain a replica of the authoritative datacenter's full
|
||||
set of ACLs on the non-authoritative servers.
|
||||
|
||||
### Blacklist mode and `consul exec`
|
||||
Replication occurs with a background process that looks for new ACLs approximately
|
||||
every 30 seconds. Replicated changes are written at a rate that's throttled to
|
||||
100 updates/second, so it may take several minutes to perform the initial sync of
|
||||
a large set of ACLs.
|
||||
|
||||
If you set [`acl_default_policy`](/docs/agent/options.html#acl_default_policy)
|
||||
to `deny`, the `anonymous` token won't have permission to read the default
|
||||
`_rexec` prefix; therefore, Consul agents using the `anonymous` token
|
||||
won't be able to perform [`consul exec`](/docs/commands/exec.html) actions.
|
||||
If there's a partition or other outage affecting the authoritative datacenter,
|
||||
and the [`acl_down_policy`](/docs/agent/options.html#acl_down_policy)
|
||||
is set to "extend-cache", tokens will be resolved during the outage using the
|
||||
replicated set of ACLs. An [ACL replication status](http://localhost:4567/docs/agent/http/acl.html#acl_replication_status)
|
||||
endpoint is available to monitor the health of the replication process.
|
||||
|
||||
Here's why: the agents need read/write permission to the `_rexec` prefix for
|
||||
[`consul exec`](/docs/commands/exec.html) to work properly. They use that prefix
|
||||
as the transport for most data.
|
||||
Locally-resolved ACLs will be cached using the [`acl_ttl`](/docs/agent/options.html#acl_ttl)
|
||||
setting of the non-authoritative datacenter, so these entries may persist in the
|
||||
cache for up to the TTL, even after the authoritative datacenter comes back online.
|
||||
|
||||
You can enable [`consul exec`](/docs/commands/exec.html) from agents that are not
|
||||
configured with a token by allowing the `anonymous` token to access that prefix.
|
||||
This can be done by giving this rule to the `anonymous` token:
|
||||
ACL replication can also be used to migrate ACLs from one datacenter to another
|
||||
using a process like this:
|
||||
|
||||
```javascript
|
||||
key "_rexec/" {
|
||||
policy = "write"
|
||||
}
|
||||
```
|
||||
1. Enable ACL replication in all datacenters to allow continuation of service
|
||||
during the migration, and to populate the target datacenter. Verify replication
|
||||
is healthy and caught up to the current ACL index in the target datacenter
|
||||
using the [ACL replication status](http://localhost:4567/docs/agent/http/acl.html#acl_replication_status)
|
||||
endpoint.
|
||||
2. Turn down the old authoritative datacenter servers.
|
||||
3. Rolling restart the servers in the target datacenter and change the
|
||||
`acl_datacenter` configuration to itself. This will automatically turn off
|
||||
replication and will enable the datacenter to start acting as the authoritative
|
||||
datacenter, using its replicated ACLs from before.
|
||||
3. Rolling restart the servers in other datacenters and change their `acl_datacenter`
|
||||
configuration to the target datacenter.
|
||||
|
||||
Alternatively, you can, of course, add an explicit
|
||||
[`acl_token`](/docs/agent/options.html#acl_token) to each agent, giving it access
|
||||
to that prefix.
|
||||
|
||||
### Blacklist mode and Service Discovery
|
||||
|
||||
If your [`acl_default_policy`](/docs/agent/options.html#acl_default_policy) is
|
||||
set to `deny`, the `anonymous` token will be unable to read any service
|
||||
information. This will cause the service discovery mechanisms in the REST API
|
||||
and the DNS interface to return no results for any service queries. This is
|
||||
because internally the API's and DNS interface consume the RPC interface, which
|
||||
will filter results for services the token has no access to.
|
||||
|
||||
You can allow all services to be discovered, mimicing the behavior of pre-0.6.0
|
||||
releases, by configuring this ACL rule for the `anonymous` token:
|
||||
|
||||
```
|
||||
service "" {
|
||||
policy = "read"
|
||||
}
|
||||
```
|
||||
|
||||
Note that the above will allow access for reading service information only. This
|
||||
level of access allows discovering other services in the system, but is not
|
||||
enough to allow the agent to sync its services and checks into the global
|
||||
catalog during [anti-entropy](/docs/internals/anti-entropy.html).
|
||||
|
||||
The most secure way of handling service registration and discovery is to run
|
||||
Consul 0.6+ and issue tokens with explicit access for the services or service
|
||||
prefixes which are expected to run on each agent.
|
||||
|
||||
### Blacklist mode and Events
|
||||
|
||||
Similar to the above, if your
|
||||
[`acl_default_policy`](/docs/agent/options.html#acl_default_policy) is set to
|
||||
`deny`, the `anonymous` token will have no access to allow firing user events.
|
||||
This deviates from pre-0.6.0 builds, where user events were completely
|
||||
unrestricted.
|
||||
|
||||
Events have their own first-class expression in the ACL syntax. To restore
|
||||
access to user events from arbitrary agents, configure an ACL rule like the
|
||||
following for the `anonymous` token:
|
||||
|
||||
```
|
||||
event "" {
|
||||
policy = "write"
|
||||
}
|
||||
```
|
||||
|
||||
As always, the more secure way to handle user events is to explicitly grant
|
||||
access to each API token based on the events they should be able to fire.
|
||||
|
||||
### Blacklist mode and Prepared Queries
|
||||
|
||||
After Consul 0.6.3, significant changes were made to ACLs for prepared queries,
|
||||
including a new `query` ACL policy. See [Prepared Query ACLs](#prepared_query_acls) below for more details.
|
||||
|
||||
### Blacklist mode and Keyring Operations
|
||||
|
||||
Consul 0.6 and later supports securing the encryption keyring operations using
|
||||
ACL's. Encryption is an optional component of the gossip layer. More information
|
||||
about Consul's keyring operations can be found on the [keyring
|
||||
command](/docs/commands/keyring.html) documentation page.
|
||||
|
||||
If your [`acl_default_policy`](/docs/agent/options.html#acl_default_policy) is
|
||||
set to `deny`, then the `anonymous` token will not have access to read or write
|
||||
to the encryption keyring. The keyring policy is yet another first-class citizen
|
||||
in the ACL syntax. You can configure the anonymous token to have free reign over
|
||||
the keyring using a policy like the following:
|
||||
|
||||
```
|
||||
keyring = "write"
|
||||
```
|
||||
|
||||
Encryption keyring operations are sensitive and should be properly secured. It
|
||||
is recommended that instead of configuring a wide-open policy like above, a
|
||||
per-token policy is applied to maximize security.
|
||||
|
||||
### Bootstrapping ACLs
|
||||
#### Bootstrapping ACLs
|
||||
|
||||
Bootstrapping the ACL system is done by providing an initial [`acl_master_token`
|
||||
configuration](/docs/agent/options.html#acl_master_token) which will be created
|
||||
|
@ -187,8 +131,7 @@ for all servers. Once this is done, restart the current leader to force a leader
|
|||
## Rule Specification
|
||||
|
||||
A core part of the ACL system is a rule language which is used to describe the policy
|
||||
that must be enforced. Consul supports ACLs for both [K/Vs](/intro/getting-started/kv.html)
|
||||
and [services](/intro/getting-started/services.html).
|
||||
that must be enforced.
|
||||
|
||||
Key policies are defined by coupling a prefix with a policy. The rules are enforced
|
||||
using a longest-prefix match policy: Consul picks the most specific policy possible. The
|
||||
|
@ -309,7 +252,108 @@ This is equivalent to the following JSON input:
|
|||
}
|
||||
```
|
||||
|
||||
## Services and Checks with ACLs
|
||||
## Building ACL Policies
|
||||
|
||||
#### Blacklist mode and `consul exec`
|
||||
|
||||
If you set [`acl_default_policy`](/docs/agent/options.html#acl_default_policy)
|
||||
to `deny`, the `anonymous` token won't have permission to read the default
|
||||
`_rexec` prefix; therefore, Consul agents using the `anonymous` token
|
||||
won't be able to perform [`consul exec`](/docs/commands/exec.html) actions.
|
||||
|
||||
Here's why: the agents need read/write permission to the `_rexec` prefix for
|
||||
[`consul exec`](/docs/commands/exec.html) to work properly. They use that prefix
|
||||
as the transport for most data.
|
||||
|
||||
You can enable [`consul exec`](/docs/commands/exec.html) from agents that are not
|
||||
configured with a token by allowing the `anonymous` token to access that prefix.
|
||||
This can be done by giving this rule to the `anonymous` token:
|
||||
|
||||
```javascript
|
||||
key "_rexec/" {
|
||||
policy = "write"
|
||||
}
|
||||
```
|
||||
|
||||
Alternatively, you can, of course, add an explicit
|
||||
[`acl_token`](/docs/agent/options.html#acl_token) to each agent, giving it access
|
||||
to that prefix.
|
||||
|
||||
#### Blacklist mode and Service Discovery
|
||||
|
||||
If your [`acl_default_policy`](/docs/agent/options.html#acl_default_policy) is
|
||||
set to `deny`, the `anonymous` token will be unable to read any service
|
||||
information. This will cause the service discovery mechanisms in the REST API
|
||||
and the DNS interface to return no results for any service queries. This is
|
||||
because internally the API's and DNS interface consume the RPC interface, which
|
||||
will filter results for services the token has no access to.
|
||||
|
||||
You can allow all services to be discovered, mimicing the behavior of pre-0.6.0
|
||||
releases, by configuring this ACL rule for the `anonymous` token:
|
||||
|
||||
```
|
||||
service "" {
|
||||
policy = "read"
|
||||
}
|
||||
```
|
||||
|
||||
Note that the above will allow access for reading service information only. This
|
||||
level of access allows discovering other services in the system, but is not
|
||||
enough to allow the agent to sync its services and checks into the global
|
||||
catalog during [anti-entropy](/docs/internals/anti-entropy.html).
|
||||
|
||||
The most secure way of handling service registration and discovery is to run
|
||||
Consul 0.6+ and issue tokens with explicit access for the services or service
|
||||
prefixes which are expected to run on each agent.
|
||||
|
||||
#### Blacklist mode and Events
|
||||
|
||||
Similar to the above, if your
|
||||
[`acl_default_policy`](/docs/agent/options.html#acl_default_policy) is set to
|
||||
`deny`, the `anonymous` token will have no access to allow firing user events.
|
||||
This deviates from pre-0.6.0 builds, where user events were completely
|
||||
unrestricted.
|
||||
|
||||
Events have their own first-class expression in the ACL syntax. To restore
|
||||
access to user events from arbitrary agents, configure an ACL rule like the
|
||||
following for the `anonymous` token:
|
||||
|
||||
```
|
||||
event "" {
|
||||
policy = "write"
|
||||
}
|
||||
```
|
||||
|
||||
As always, the more secure way to handle user events is to explicitly grant
|
||||
access to each API token based on the events they should be able to fire.
|
||||
|
||||
#### Blacklist mode and Prepared Queries
|
||||
|
||||
After Consul 0.6.3, significant changes were made to ACLs for prepared queries,
|
||||
including a new `query` ACL policy. See [Prepared Query ACLs](#prepared_query_acls) below for more details.
|
||||
|
||||
#### Blacklist mode and Keyring Operations
|
||||
|
||||
Consul 0.6 and later supports securing the encryption keyring operations using
|
||||
ACL's. Encryption is an optional component of the gossip layer. More information
|
||||
about Consul's keyring operations can be found on the [keyring
|
||||
command](/docs/commands/keyring.html) documentation page.
|
||||
|
||||
If your [`acl_default_policy`](/docs/agent/options.html#acl_default_policy) is
|
||||
set to `deny`, then the `anonymous` token will not have access to read or write
|
||||
to the encryption keyring. The keyring policy is yet another first-class citizen
|
||||
in the ACL syntax. You can configure the anonymous token to have free reign over
|
||||
the keyring using a policy like the following:
|
||||
|
||||
```
|
||||
keyring = "write"
|
||||
```
|
||||
|
||||
Encryption keyring operations are sensitive and should be properly secured. It
|
||||
is recommended that instead of configuring a wide-open policy like above, a
|
||||
per-token policy is applied to maximize security.
|
||||
|
||||
#### Services and Checks with ACLs
|
||||
|
||||
Consul allows configuring ACL policies which may control access to service and
|
||||
check registration. In order to successfully register a service or check with
|
||||
|
@ -330,7 +374,7 @@ methods of configuring ACL tokens to use for registration events:
|
|||
[HTTP API](/docs/agent/http.html) for operations that require them.
|
||||
|
||||
<a name="discovery_acls"></a>
|
||||
## Restricting service discovery with ACLs
|
||||
#### Restricting service discovery with ACLs
|
||||
|
||||
In Consul 0.6, the ACL system was extended to support restricting read access to
|
||||
service registrations. This allows tighter access control and limits the ability
|
||||
|
@ -413,7 +457,7 @@ Capturing ACL Tokens is analogous to
|
|||
Token is similar to the complementary `SECURITY INVOKER` attribute.
|
||||
|
||||
<a name="prepared_query_acl_changes"></a>
|
||||
#### ACL Implementation Changes
|
||||
#### ACL Implementation Changes for Prepared Queries
|
||||
|
||||
Prepared queries were originally introduced in Consul 0.6.0, and ACL behavior remained
|
||||
unchanged through version 0.6.3, but was then changed to allow better management of the
|
||||
|
|
Loading…
Reference in New Issue