From d336bdd7b058e88ae1005f4118137113d3511595 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 2 Aug 2016 22:04:11 -0700 Subject: [PATCH 01/18] Adds basic ACL replication plumbing. --- command/agent/agent.go | 3 + command/agent/config.go | 9 + command/agent/config_test.go | 7 +- consul/acl.go | 2 +- consul/acl_endpoint.go | 74 ++++---- consul/acl_replication.go | 309 +++++++++++++++++++++++++++++++++ consul/config.go | 50 ++++-- consul/server.go | 52 ++++-- consul/structs/structs.go | 20 ++- consul/structs/structs_test.go | 47 +++++ 10 files changed, 507 insertions(+), 66 deletions(-) create mode 100644 consul/acl_replication.go diff --git a/command/agent/agent.go b/command/agent/agent.go index 42fda9963c..79b94d55db 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -339,6 +339,9 @@ func (a *Agent) consulConfig() *consul.Config { if a.config.ACLDownPolicy != "" { base.ACLDownPolicy = a.config.ACLDownPolicy } + if a.config.ACLReplicationToken != "" { + base.ACLReplicationToken = a.config.ACLReplicationToken + } if a.config.SessionTTLMinRaw != "" { base.SessionTTLMin = a.config.SessionTTLMin } diff --git a/command/agent/config.go b/command/agent/config.go index 11d58c2c92..a83a2ddfd6 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -452,6 +452,12 @@ type Config struct { // this acts like deny. ACLDownPolicy string `mapstructure:"acl_down_policy"` + // ACLReplicationToken is used to fetch ACLs from the ACLDatacenter in + // order to replicate them locally. Setting this to a non-empty value + // also enables replication. Replication is only available in datacenters + // other than the ACLDatacenter. + ACLReplicationToken string `mapstructure:"acl_replication_token"` + // Watches are used to monitor various endpoints and to invoke a // handler to act appropriately. These are managed entirely in the // agent layer using the standard APIs. @@ -1319,6 +1325,9 @@ func MergeConfig(a, b *Config) *Config { if b.ACLDefaultPolicy != "" { result.ACLDefaultPolicy = b.ACLDefaultPolicy } + if b.ACLReplicationToken != "" { + result.ACLReplicationToken = b.ACLReplicationToken + } if len(b.Watches) != 0 { result.Watches = append(result.Watches, b.Watches...) } diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 06e5d85427..00e5a0dfb3 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -622,7 +622,8 @@ func TestDecodeConfig(t *testing.T) { // ACLs input = `{"acl_token": "1234", "acl_datacenter": "dc2", "acl_ttl": "60s", "acl_down_policy": "deny", - "acl_default_policy": "deny", "acl_master_token": "2345"}` + "acl_default_policy": "deny", "acl_master_token": "2345", + "acl_replication_token": "8675309"}` config, err = DecodeConfig(bytes.NewReader([]byte(input))) if err != nil { t.Fatalf("err: %s", err) @@ -646,6 +647,9 @@ func TestDecodeConfig(t *testing.T) { if config.ACLDefaultPolicy != "deny" { t.Fatalf("bad: %#v", config) } + if config.ACLReplicationToken != "8675309" { + t.Fatalf("bad: %#v", config) + } // Watches input = `{"watches": [{"type":"keyprefix", "prefix":"foo/", "handler":"foobar"}]}` @@ -1432,6 +1436,7 @@ func TestMergeConfig(t *testing.T) { ACLTTLRaw: "15s", ACLDownPolicy: "deny", ACLDefaultPolicy: "deny", + ACLReplicationToken: "8765309", Watches: []map[string]interface{}{ map[string]interface{}{ "type": "keyprefix", diff --git a/consul/acl.go b/consul/acl.go index fa3f558a6c..d2a6461731 100644 --- a/consul/acl.go +++ b/consul/acl.go @@ -74,7 +74,7 @@ func (s *Server) aclFault(id string) (string, string, error) { return s.config.ACLDefaultPolicy, acl.Rules, nil } -// resolveToken is used to resolve an ACL is any is appropriate +// resolveToken is used to resolve an ACL if any is appropriate func (s *Server) resolveToken(id string) (acl.ACL, error) { // Check if there is no ACL datacenter (ACL's disabled) authDC := s.config.ACLDatacenter diff --git a/consul/acl_endpoint.go b/consul/acl_endpoint.go index 49f927161a..d14c90d289 100644 --- a/consul/acl_endpoint.go +++ b/consul/acl_endpoint.go @@ -15,26 +15,13 @@ type ACL struct { srv *Server } -// Apply is used to apply a modifying request to the data store. This should -// only be used for operations that modify the data -func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { - if done, err := a.srv.forward("ACL.Apply", args, args, reply); done { - return err - } - defer metrics.MeasureSince([]string{"consul", "acl", "apply"}, time.Now()) - - // Verify we are allowed to serve this request - if a.srv.config.ACLDatacenter != a.srv.config.Datacenter { - return fmt.Errorf(aclDisabled) - } - - // Verify token is permitted to modify ACLs - if acl, err := a.srv.resolveToken(args.Token); err != nil { - return err - } else if acl == nil || !acl.ACLModify() { - return permissionDeniedErr - } - +// aclApplyInternal is used to apply an ACL request after it has been vetted that +// this is a valid operation. It is used when users are updating ACLs, in which +// case we check their token to make sure they have management privileges. It is +// also used for ACL replication. We want to run the replicated ACLs through the +// same checks on the change itself. If an operation needs to generate an ID, +// routine will fill in an ID with the args as part of the request. +func aclApplyInternal(srv *Server, args *structs.ACLRequest, reply *string) error { switch args.Op { case structs.ACLSet: // Verify the ACL type @@ -61,16 +48,16 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { // deterministic. Once the entry is in the log, the state update MUST // be deterministic or the followers will not converge. if args.ACL.ID == "" { - state := a.srv.fsm.State() + state := srv.fsm.State() for { if args.ACL.ID, err = uuid.GenerateUUID(); err != nil { - a.srv.logger.Printf("[ERR] consul.acl: UUID generation failed: %v", err) + srv.logger.Printf("[ERR] consul.acl: UUID generation failed: %v", err) return err } _, acl, err := state.ACLGet(args.ACL.ID) if err != nil { - a.srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err) + srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err) return err } if acl == nil { @@ -91,24 +78,53 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { } // Apply the update - resp, err := a.srv.raftApply(structs.ACLRequestType, args) + resp, err := srv.raftApply(structs.ACLRequestType, args) if err != nil { - a.srv.logger.Printf("[ERR] consul.acl: Apply failed: %v", err) + srv.logger.Printf("[ERR] consul.acl: Apply failed: %v", err) return err } if respErr, ok := resp.(error); ok { return respErr } + // Check if the return type is a string + if respString, ok := resp.(string); ok { + *reply = respString + } + + return nil +} + +// Apply is used to apply a modifying request to the data store. This should +// only be used for operations that modify the data +func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { + if done, err := a.srv.forward("ACL.Apply", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"consul", "acl", "apply"}, time.Now()) + + // Verify we are allowed to serve this request + if a.srv.config.ACLDatacenter != a.srv.config.Datacenter { + return fmt.Errorf(aclDisabled) + } + + // Verify token is permitted to modify ACLs + if acl, err := a.srv.resolveToken(args.Token); err != nil { + return err + } else if acl == nil || !acl.ACLModify() { + return permissionDeniedErr + } + + // Do the apply now that this update is vetted. + if err := aclApplyInternal(a.srv, args, reply); err != nil { + return err + } + // Clear the cache if applicable if args.ACL.ID != "" { a.srv.aclAuthCache.ClearACL(args.ACL.ID) } - // Check if the return type is a string - if respString, ok := resp.(string); ok { - *reply = respString - } return nil } diff --git a/consul/acl_replication.go b/consul/acl_replication.go new file mode 100644 index 0000000000..af4aebffb7 --- /dev/null +++ b/consul/acl_replication.go @@ -0,0 +1,309 @@ +package consul + +import ( + "fmt" + "sort" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/lib" +) + +// aclIDSorter is used to make sure a given list of ACLs is sorted by token ID. +// This should always be true, but since this is crucial for correctness and we +// are accepting input from another server, we sort to make sure. +type aclIDSorter struct { + acls structs.ACLs +} + +// See sort.Interface. +func (a *aclIDSorter) Len() int { + return len(a.acls) +} + +// See sort.Interface. +func (a *aclIDSorter) Swap(i, j int) { + a.acls[i], a.acls[j] = a.acls[j], a.acls[i] +} + +// See sort.Interface. +func (a *aclIDSorter) Less(i, j int) bool { + return a.acls[i].ID < a.acls[j].ID +} + +// aclIterator simplifies the algorithm below by providing a basic iterator that +// moves through a list of ACLs and returns nil when it's exhausted. +type aclIterator struct { + acls structs.ACLs + + // index is the current position of the iterator. + index int +} + +// Front returns the item at index position, or nil if the list is exhausted. +func (a *aclIterator) Front() *structs.ACL { + if a.index < len(a.acls) { + return a.acls[a.index] + } + + return nil +} + +// Next advances the iterator to the next index. +func (a *aclIterator) Next() { + a.index++ +} + +// reconcileACLs takes the local and remote ACL state, and produces a list of +// changes required in order to bring the local ACLs into sync with the remote +// ACLs. You can supply lastRemoteIndex as a hint that replication has succeeded +// up to that remote index and it will make this process more efficient by only +// comparing ACL entries modified after that index. Setting this to 0 will force +// a full compare of all existing ACLs. +func reconcileACLs(local, remote structs.ACLs, lastRemoteIndex uint64) structs.ACLRequests { + // Since sorting the lists is crucial for correctness, we are depending + // on data coming from other servers potentially running a different, + // version of Consul, and sorted-ness is kind of a subtle property of + // the state store indexing, it's prudent to make sure things are sorted + // before we begin. + sort.Sort(&aclIDSorter{local}) + sort.Sort(&aclIDSorter{remote}) + + // Run through both lists and reconcile them. + var changes structs.ACLRequests + localIter, remoteIter := &aclIterator{local, 0}, &aclIterator{remote, 0} + for localIter.Front() != nil || remoteIter.Front() != nil { + // If the local list is exhausted, then process this as a remote + // add. We know from the loop condition that there's something + // in the remote list. + if localIter.Front() == nil { + changes = append(changes, &structs.ACLRequest{ + Op: structs.ACLSet, + ACL: *(remoteIter.Front()), + }) + remoteIter.Next() + continue + } + + // If the remote list is exhausted, then process this as a local + // delete. We know from the loop condition that there's something + // in the local list. + if remoteIter.Front() == nil { + changes = append(changes, &structs.ACLRequest{ + Op: structs.ACLDelete, + ACL: *(localIter.Front()), + }) + localIter.Next() + continue + } + + // At this point we know there's something at the front of each + // list we need to resolve. + + // If the remote list has something local doesn't, we add it. + if localIter.Front().ID > remoteIter.Front().ID { + changes = append(changes, &structs.ACLRequest{ + Op: structs.ACLSet, + ACL: *(remoteIter.Front()), + }) + remoteIter.Next() + continue + } + + // If local has something remote doesn't, we delete it. + if localIter.Front().ID < remoteIter.Front().ID { + changes = append(changes, &structs.ACLRequest{ + Op: structs.ACLDelete, + ACL: *(localIter.Front()), + }) + localIter.Next() + continue + } + + // Local and remote have an ACL with the same ID, so we might + // need to compare them. + l, r := localIter.Front(), remoteIter.Front() + if r.RaftIndex.ModifyIndex > lastRemoteIndex && !r.IsSame(l) { + changes = append(changes, &structs.ACLRequest{ + Op: structs.ACLSet, + ACL: *r, + }) + } + localIter.Next() + remoteIter.Next() + } + return changes +} + +// FetchLocalACLs returns the ACLs in the local state store. +func (s *Server) fetchLocalACLs() (structs.ACLs, error) { + _, local, err := s.fsm.State().ACLList() + if err != nil { + return nil, err + } + return local, nil +} + +// FetchRemoteACLs is used to get the remote set of ACLs from the ACL +// datacenter. The lastIndex parameter is a hint about which remote index we +// have replicated to, so this is expected to block until something changes. +func (s *Server) fetchRemoteACLs(lastRemoteIndex uint64) (*structs.IndexedACLs, error) { + args := structs.DCSpecificRequest{ + Datacenter: s.config.ACLDatacenter, + QueryOptions: structs.QueryOptions{ + Token: s.config.ACLReplicationToken, + MinQueryIndex: lastRemoteIndex, + AllowStale: true, + }, + } + var remote structs.IndexedACLs + if err := s.RPC("ACL.List", &args, &remote); err != nil { + return nil, err + } + return &remote, nil +} + +// UpdateLocalACLs is given a list of changes to apply in order to bring the +// local ACLs in-line with the remote ACLs from the ACL datacenter. +func (s *Server) updateLocalACLs(changes structs.ACLRequests) error { + var ops int + start := time.Now() + for _, change := range changes { + // Do a very simple rate limit algorithm where we check every N + // operations and wait out to the second before we continue. If + // it's going slower than that, the sleep time will be negative + // so we will just keep going without delay. + if ops > s.config.ACLReplicationApplyLimit { + elapsed := time.Now().Sub(start) + time.Sleep(1*time.Second - elapsed) + ops, start = 0, time.Now() + } + + // Note that we are using the single ACL interface here and not + // performing all this inside a single transaction. This is OK + // for two reasons. First, there's nothing else other than this + // replication routine that alters the local ACLs, so there's + // nothing to contend with locally. Second, if an apply fails + // in the middle (most likely due to losing leadership), the + // next replication pass will clean up and check everything + // again. + var reply string + if err := aclApplyInternal(s, change, &reply); err != nil { + return err + } + ops++ + } + return nil +} + +// replicateACLs is a runs one pass of the algorithm for replicating ACLs from +// a remote ACL datacenter to local state. If there's any error, this will return +// 0 for the lastRemoteIndex, which will cause us to immediately do a full sync +// next time. +func (s *Server) replicateACLs(lastRemoteIndex uint64) (uint64, error) { + remote, err := s.fetchRemoteACLs(lastRemoteIndex) + if err != nil { + return 0, fmt.Errorf("failed to retrieve remote ACLs: %v", err) + } + + // This will be pretty common because we will be blocking for a long time + // and may have lost leadership, so lets control the message here instead + // of returning deeper error messages from from Raft. + if !s.IsLeader() { + return 0, fmt.Errorf("no longer cluster leader") + } + + // Measure everything after the remote query, which can block for long + // periods of time. This metric is a good measure of how expensive the + // replication process is. + defer metrics.MeasureSince([]string{"consul", "leader", "replicateACLs"}, time.Now()) + + local, err := s.fetchLocalACLs() + if err != nil { + return 0, fmt.Errorf("failed to retrieve local ACLs: %v", err) + } + + // If the remote index ever goes backwards, it's a good indication that + // the remote side was rebuilt and we should do a full sync since we + // can't make any assumptions about what's going on. + if remote.QueryMeta.Index < lastRemoteIndex { + s.logger.Printf("[WARN] consul: ACL replication remote index moved backwards (%d to %d), forcing a full ACL sync", lastRemoteIndex, remote.QueryMeta.Index) + lastRemoteIndex = 0 + } + + changes := reconcileACLs(local, remote.ACLs, lastRemoteIndex) + if err := s.updateLocalACLs(changes); err != nil { + return 0, fmt.Errorf("failed to sync ACL changes: %v", err) + } + + // Return the index we got back from the remote side, since we've synced + // up with the remote state as of that index. + return remote.QueryMeta.Index, nil +} + +// IsACLReplicationEnabled returns true if ACL replication is enabled. +func (s *Server) IsACLReplicationEnabled() bool { + authDC := s.config.ACLDatacenter + return len(authDC) > 0 && (authDC != s.config.Datacenter) && + len(s.config.ACLReplicationToken) > 0 +} + +// runACLReplication is a long-running goroutine that will attempt to replicate +// ACLs while the server is the leader, until the shutdown channel closes. +func (s *Server) runACLReplication() { + defer s.shutdownWait.Done() + + // Give each server's replicator a random initial phase for good + // measure. + select { + case <-time.After(lib.RandomStagger(s.config.ACLReplicationInterval)): + case <-s.shutdownCh: + } + + var lastRemoteIndex uint64 + var wasActive bool + replicate := func() { + if !wasActive { + s.logger.Printf("[INFO] consul: ACL replication started") + wasActive = true + } + + var err error + lastRemoteIndex, err = s.replicateACLs(lastRemoteIndex) + if err != nil { + s.logger.Printf("[WARN] consul: ACL replication error (will retry if still leader): %v", err) + } else { + s.logger.Printf("[DEBUG] consul: ACL replication completed through index %d", lastRemoteIndex) + } + } + pause := func() { + if wasActive { + s.logger.Printf("[INFO] consul: ACL replication stopped (no longer leader)") + wasActive = false + } + } + + // This will slowly poll to see if replication should be active. Once it + // is and we've caught up, the replicate() call will begin to block and + // only wake up when the query timer expires or there are new ACLs to + // replicate. We've chosen this design so that the ACLReplicationInterval + // is the lower bound for how quickly we will replicate, no matter how + // much ACL churn is happening on the remote side. + // + // The blocking query inside replicate() respects the shutdown channel, + // so we won't get stuck in here as things are torn down. + for { + select { + case <-s.shutdownCh: + return + + case <-time.After(s.config.ACLReplicationInterval): + if s.IsLeader() { + replicate() + } else { + pause() + } + } + } +} diff --git a/consul/config.go b/consul/config.go index 8e252b6351..0e9cee4384 100644 --- a/consul/config.go +++ b/consul/config.go @@ -173,6 +173,24 @@ type Config struct { // "allow" can be used to allow all requests. This is not recommended. ACLDownPolicy string + // ACLReplicationToken is used to fetch ACLs from the ACLDatacenter in + // order to replicate them locally. Setting this to a non-empty value + // also enables replication. Replication is only available in datacenters + // other than the ACLDatacenter. + ACLReplicationToken string + + // ACLReplicationInterval is the interval at which replication passes + // will occur. Queries to the ACLDatacenter may block, so replication + // can happen less often than this, but the interval forms the upper + // limit to how fast we will go if there was constant ACL churn on the + // remote end. + ACLReplicationInterval time.Duration + + // ACLReplicationApplyLimit is the max number of replication-related + // apply operations that we allow during a one second period. This is + // used to limit the amount of Raft bandwidth used for replication. + ACLReplicationApplyLimit int + // TombstoneTTL is used to control how long KV tombstones are retained. // This provides a window of time where the X-Consul-Index is monotonic. // Outside this window, the index may not be monotonic. This is a result @@ -271,21 +289,23 @@ func DefaultConfig() *Config { } conf := &Config{ - Datacenter: DefaultDC, - NodeName: hostname, - RPCAddr: DefaultRPCAddr, - RaftConfig: raft.DefaultConfig(), - SerfLANConfig: serf.DefaultConfig(), - SerfWANConfig: serf.DefaultConfig(), - ReconcileInterval: 60 * time.Second, - ProtocolVersion: ProtocolVersion2Compatible, - ACLTTL: 30 * time.Second, - ACLDefaultPolicy: "allow", - ACLDownPolicy: "extend-cache", - TombstoneTTL: 15 * time.Minute, - TombstoneTTLGranularity: 30 * time.Second, - SessionTTLMin: 10 * time.Second, - DisableCoordinates: false, + Datacenter: DefaultDC, + NodeName: hostname, + RPCAddr: DefaultRPCAddr, + RaftConfig: raft.DefaultConfig(), + SerfLANConfig: serf.DefaultConfig(), + SerfWANConfig: serf.DefaultConfig(), + ReconcileInterval: 60 * time.Second, + ProtocolVersion: ProtocolVersion2Compatible, + ACLTTL: 30 * time.Second, + ACLDefaultPolicy: "allow", + ACLDownPolicy: "extend-cache", + ACLReplicationInterval: 30 * time.Second, + ACLReplicationApplyLimit: 100, // ops / sec + TombstoneTTL: 15 * time.Minute, + TombstoneTTLGranularity: 30 * time.Second, + SessionTTLMin: 10 * time.Second, + DisableCoordinates: false, // These are tuned to provide a total throughput of 128 updates // per second. If you update these, you should update the client- diff --git a/consul/server.go b/consul/server.go index 4138e2cc10..02490c6ae0 100644 --- a/consul/server.go +++ b/consul/server.go @@ -149,9 +149,17 @@ type Server struct { // for the KV tombstones tombstoneGC *state.TombstoneGC + // shutdown and the associated members here are used in orchestrating + // a clean shutdown. The shutdownCh is never written to, only closed to + // indicate a shutdown has been initiated. The shutdownWait group will + // be waited on after closing the shutdownCh, but before any other + // shutdown activities take place in the server. Anything added to the + // shutdownWait group will block all the rest of shutdown, so use this + // sparingly and carefully. shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex + shutdownWait sync.WaitGroup } // Holds the RPC endpoints @@ -171,49 +179,47 @@ type endpoints struct { // NewServer is used to construct a new Consul server from the // configuration, potentially returning an error func NewServer(config *Config) (*Server, error) { - // Check the protocol version + // Check the protocol version. if err := config.CheckVersion(); err != nil { return nil, err } - // Check for a data directory! + // Check for a data directory. if config.DataDir == "" && !config.DevMode { return nil, fmt.Errorf("Config must provide a DataDir") } - // Sanity check the ACLs + // Sanity check the ACLs. if err := config.CheckACL(); err != nil { return nil, err } - // Ensure we have a log output + // Ensure we have a log output and create a logger. if config.LogOutput == nil { config.LogOutput = os.Stderr } + logger := log.New(config.LogOutput, "", log.LstdFlags) - // Create the tls wrapper for outgoing connections + // Create the TLS wrapper for outgoing connections. tlsConf := config.tlsConfig() tlsWrap, err := tlsConf.OutgoingTLSWrapper() if err != nil { return nil, err } - // Get the incoming tls config + // Get the incoming TLS config. incomingTLS, err := tlsConf.IncomingTLSConfig() if err != nil { return nil, err } - // Create a logger - logger := log.New(config.LogOutput, "", log.LstdFlags) - - // Create the tombstone GC + // Create the tombstone GC. gc, err := state.NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity) if err != nil { return nil, err } - // Create server + // Create server. s := &Server{ config: config, connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap), @@ -229,32 +235,32 @@ func NewServer(config *Config) (*Server, error) { shutdownCh: make(chan struct{}), } - // Initialize the authoritative ACL cache + // Initialize the authoritative ACL cache. s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclFault) if err != nil { s.Shutdown() return nil, fmt.Errorf("Failed to create ACL cache: %v", err) } - // Set up the non-authoritative ACL cache + // Set up the non-authoritative ACL cache. if s.aclCache, err = newAclCache(config, logger, s.RPC); err != nil { s.Shutdown() return nil, err } - // Initialize the RPC layer + // Initialize the RPC layer. if err := s.setupRPC(tlsWrap); err != nil { s.Shutdown() return nil, fmt.Errorf("Failed to start RPC layer: %v", err) } - // Initialize the Raft server + // Initialize the Raft server. if err := s.setupRaft(); err != nil { s.Shutdown() return nil, fmt.Errorf("Failed to start Raft: %v", err) } - // Initialize the lan Serf + // Initialize the LAN Serf. s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false) if err != nil { @@ -263,7 +269,7 @@ func NewServer(config *Config) (*Server, error) { } go s.lanEventHandler() - // Initialize the wan Serf + // Initialize the WAN Serf. s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true) if err != nil { @@ -272,11 +278,18 @@ func NewServer(config *Config) (*Server, error) { } go s.wanEventHandler() - // Start listening for RPC requests + // Start ACL replication. + if s.IsACLReplicationEnabled() { + s.shutdownWait.Add(1) + go s.runACLReplication() + } + + // Start listening for RPC requests. go s.listen() - // Start the metrics handlers + // Start the metrics handlers. go s.sessionStats() + return s, nil } @@ -496,6 +509,7 @@ func (s *Server) Shutdown() error { s.shutdown = true close(s.shutdownCh) + s.shutdownWait.Wait() if s.serfLAN != nil { s.serfLAN.Shutdown() diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 4f99399b38..fdd36ea2b5 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -662,7 +662,7 @@ type IndexedSessions struct { QueryMeta } -// ACL is used to represent a token and it's rules +// ACL is used to represent a token and its rules type ACL struct { ID string Name string @@ -681,6 +681,21 @@ const ( ACLDelete = "delete" ) +// IsSame checks if one ACL is the same as another, without looking +// at the Raft information (that's why we didn't call it IsEqual). This is +// useful for seeing if an update would be idempotent for all the functional +// parts of the structure. +func (a *ACL) IsSame(other *ACL) bool { + if a.ID != other.ID || + a.Name != other.Name || + a.Type != other.Type || + a.Rules != other.Rules { + return false + } + + return true +} + // ACLRequest is used to create, update or delete an ACL type ACLRequest struct { Datacenter string @@ -693,6 +708,9 @@ func (r *ACLRequest) RequestDatacenter() string { return r.Datacenter } +// ACLRequests is a list of ACL change requests. +type ACLRequests []*ACLRequest + // ACLSpecificRequest is used to request an ACL by ID type ACLSpecificRequest struct { Datacenter string diff --git a/consul/structs/structs_test.go b/consul/structs/structs_test.go index 1aa84b3af3..8caa66ffe8 100644 --- a/consul/structs/structs_test.go +++ b/consul/structs/structs_test.go @@ -58,6 +58,53 @@ func TestStructs_Implements(t *testing.T) { ) } +func TestStructs_ACL_IsSame(t *testing.T) { + acl := &ACL{ + ID: "guid", + Name: "An ACL for testing", + Type: "client", + Rules: "service \"\" { policy = \"read\" }", + } + if !acl.IsSame(acl) { + t.Fatalf("should be equal to itself") + } + + other := &ACL{ + ID: "guid", + Name: "An ACL for testing", + Type: "client", + Rules: "service \"\" { policy = \"read\" }", + RaftIndex: RaftIndex{ + CreateIndex: 1, + ModifyIndex: 2, + }, + } + if !acl.IsSame(other) || !other.IsSame(acl) { + t.Fatalf("should not care about Raft fields") + } + + check := func(twiddle, restore func()) { + if !acl.IsSame(other) || !other.IsSame(acl) { + t.Fatalf("should be the same") + } + + twiddle() + if acl.IsSame(other) || other.IsSame(acl) { + t.Fatalf("should not be the same") + } + + restore() + if !acl.IsSame(other) || !other.IsSame(acl) { + t.Fatalf("should be the same") + } + } + + check(func() { other.ID = "nope" }, func() { other.ID = "guid" }) + check(func() { other.Name = "nope" }, func() { other.Name = "An ACL for testing" }) + check(func() { other.Type = "management" }, func() { other.Type = "client" }) + check(func() { other.Rules = "" }, func() { other.Rules = "service \"\" { policy = \"read\" }" }) +} + // testServiceNode gives a fully filled out ServiceNode instance. func testServiceNode() *ServiceNode { return &ServiceNode{ From 18b817b575e118acc74036f87327af4a2ad0728b Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 3 Aug 2016 17:00:59 -0700 Subject: [PATCH 02/18] Hides the acl_replication_token from JSON output, like in /v1/agent/self. --- command/agent/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/command/agent/config.go b/command/agent/config.go index a83a2ddfd6..ac936ca114 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -456,7 +456,7 @@ type Config struct { // order to replicate them locally. Setting this to a non-empty value // also enables replication. Replication is only available in datacenters // other than the ACLDatacenter. - ACLReplicationToken string `mapstructure:"acl_replication_token"` + ACLReplicationToken string `mapstructure:"acl_replication_token" json:"-"` // Watches are used to monitor various endpoints and to invoke a // handler to act appropriately. These are managed entirely in the From e83172792386fd473dd3220c7e6ea38fe18936b0 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 3 Aug 2016 17:01:32 -0700 Subject: [PATCH 03/18] Activates fallback to replicated ACLs. --- acl/cache.go | 10 ++-- consul/acl.go | 114 +++++++++++++++++++++++++++----------- consul/acl_endpoint.go | 7 ++- consul/acl_replication.go | 6 +- consul/server.go | 26 ++++----- 5 files changed, 107 insertions(+), 56 deletions(-) diff --git a/acl/cache.go b/acl/cache.go index 069d13ba75..0debd1d20f 100644 --- a/acl/cache.go +++ b/acl/cache.go @@ -8,7 +8,7 @@ import ( ) // FaultFunc is a function used to fault in the parent, -// rules for an ACL given it's ID +// rules for an ACL given its ID type FaultFunc func(id string) (string, string, error) // aclEntry allows us to store the ACL with it's policy ID @@ -46,7 +46,7 @@ func NewCache(size int, faultfn FaultFunc) (*Cache, error) { // GetPolicy is used to get a potentially cached policy set. // If not cached, it will be parsed, and then cached. func (c *Cache) GetPolicy(rules string) (*Policy, error) { - return c.getPolicy(c.ruleID(rules), rules) + return c.getPolicy(RuleID(rules), rules) } // getPolicy is an internal method to get a cached policy, @@ -66,8 +66,8 @@ func (c *Cache) getPolicy(id, rules string) (*Policy, error) { } -// ruleID is used to generate an ID for a rule -func (c *Cache) ruleID(rules string) string { +// RuleID is used to generate an ID for a rule +func RuleID(rules string) string { return fmt.Sprintf("%x", md5.Sum([]byte(rules))) } @@ -112,7 +112,7 @@ func (c *Cache) GetACL(id string) (ACL, error) { if err != nil { return nil, err } - ruleID := c.ruleID(rules) + ruleID := RuleID(rules) // Check for a compiled ACL policyID := c.policyID(parentID, ruleID) diff --git a/consul/acl.go b/consul/acl.go index d2a6461731..7a472134fb 100644 --- a/consul/acl.go +++ b/consul/acl.go @@ -44,7 +44,7 @@ var ( permissionDeniedErr = errors.New(permissionDenied) ) -// aclCacheEntry is used to cache non-authoritative ACL's +// aclCacheEntry is used to cache non-authoritative ACLs // If non-authoritative, then we must respect a TTL type aclCacheEntry struct { ACL acl.ACL @@ -52,9 +52,14 @@ type aclCacheEntry struct { ETag string } -// aclFault is used to fault in the rules for an ACL if we take a miss -func (s *Server) aclFault(id string) (string, string, error) { +// aclLocalFault is used by the authoritative ACL cache to fault in the rules +// for an ACL if we take a miss. This goes directly to the state store, so it +// assumes its running in the ACL datacenter, or in a non-ACL datacenter when +// using its replicated ACLs during an outage. +func (s *Server) aclLocalFault(id string) (string, string, error) { defer metrics.MeasureSince([]string{"consul", "acl", "fault"}, time.Now()) + + // Query the state store. state := s.fsm.State() _, acl, err := state.ACLGet(id) if err != nil { @@ -64,19 +69,23 @@ func (s *Server) aclFault(id string) (string, string, error) { return "", "", errors.New(aclNotFound) } - // Management tokens have no policy and inherit from the - // 'manage' root policy + // Management tokens have no policy and inherit from the 'manage' root + // policy. if acl.Type == structs.ACLTypeManagement { return "manage", "", nil } - // Otherwise use the base policy + // Otherwise use the default policy. return s.config.ACLDefaultPolicy, acl.Rules, nil } -// resolveToken is used to resolve an ACL if any is appropriate +// resolveToken is the primary interface used by ACL-checkers (such as an +// endpoint handling a request) to resolve a token. If ACLs aren't enabled +// then this will return a nil token, otherwise it will attempt to use local +// cache and ultimately the ACL datacenter to get the policy associated with the +// token. func (s *Server) resolveToken(id string) (acl.ACL, error) { - // Check if there is no ACL datacenter (ACL's disabled) + // Check if there is no ACL datacenter (ACLs disabled) authDC := s.config.ACLDatacenter if len(authDC) == 0 { return nil, nil @@ -108,23 +117,30 @@ type aclCache struct { config *Config logger *log.Logger - // acls is a non-authoritative ACL cache + // acls is a non-authoritative ACL cache. acls *lru.Cache - // aclPolicyCache is a policy cache + // aclPolicyCache is a non-authoritative policy cache. policies *lru.Cache - // The RPC function used to talk to the client/server + // rpc is a function used to talk to the client/server. rpc rpcFn + + // local is a function used to look for an ACL locally if replication is + // enabled. This will be nil if replication isn't enabled. + local acl.FaultFunc } -// newAclCache returns a new cache layer for ACLs and policies -func newAclCache(conf *Config, logger *log.Logger, rpc rpcFn) (*aclCache, error) { +// newAclCache returns a new non-authoritative cache for ACLs. This is used for +// performance, and is used inside the ACL datacenter on non-leader servers, and +// outside the ACL datacenter everywhere. +func newAclCache(conf *Config, logger *log.Logger, rpc rpcFn, local acl.FaultFunc) (*aclCache, error) { var err error cache := &aclCache{ config: conf, logger: logger, rpc: rpc, + local: local, } // Initialize the non-authoritative ACL cache @@ -142,17 +158,16 @@ func newAclCache(conf *Config, logger *log.Logger, rpc rpcFn) (*aclCache, error) return cache, nil } -// lookupACL is used when we are non-authoritative, and need -// to resolve an ACL +// lookupACL is used when we are non-authoritative, and need to resolve an ACL. func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) { - // Check the cache for the ACL + // Check the cache for the ACL. var cached *aclCacheEntry raw, ok := c.acls.Get(id) if ok { cached = raw.(*aclCacheEntry) } - // Check for live cache + // Check for live cache. if cached != nil && time.Now().Before(cached.Expires) { metrics.IncrCounter([]string{"consul", "acl", "cache_hit"}, 1) return cached.ACL, nil @@ -160,7 +175,7 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) { metrics.IncrCounter([]string{"consul", "acl", "cache_miss"}, 1) } - // Attempt to refresh the policy + // Attempt to refresh the policy from the ACL datacenter via an RPC. args := structs.ACLPolicyRequest{ Datacenter: authDC, ACL: id, @@ -168,29 +183,62 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) { if cached != nil { args.ETag = cached.ETag } - var out structs.ACLPolicy - err := c.rpc("ACL.GetPolicy", &args, &out) - - // Handle the happy path + var reply structs.ACLPolicy + err := c.rpc("ACL.GetPolicy", &args, &reply) if err == nil { - return c.useACLPolicy(id, authDC, cached, &out) + return c.useACLPolicy(id, authDC, cached, &reply) } - // Check for not-found + // Check for not-found, which will cause us to bail immediately. For any + // other error we report it in the logs but can continue. if strings.Contains(err.Error(), aclNotFound) { return nil, errors.New(aclNotFound) } else { - s := id - // Print last 3 chars of the token if long enough, otherwise completly hide it - if len(s) > 3 { - s = fmt.Sprintf("token ending in '%s'", s[len(s)-3:]) - } else { - s = redactedToken - } - c.logger.Printf("[ERR] consul.acl: Failed to get policy for %s: %v", s, err) + // TODO (slackpad) - We used to print a few characters of the + // token here if the token was long enough. This was bugging me + // so I deleted it. We should probably print a hash of the token, + // or better yet let's add another ID to tokens to identify them + // without giving away their privileges. + c.logger.Printf("[ERR] consul.acl: Failed to get policy from ACL datacenter: %v", err) } - // Unable to refresh, apply the down policy + // At this point we might have an expired cache entry and we know that + // there was a problem getting the ACL from the ACL datacenter. If a + // local ACL fault function is registered to query replicated ACL data, + // and the user's policy allows it, we will try locally before we give + // up. + if c.local != nil && c.config.ACLDownPolicy == "extend-cache" { + parent, rules, err := c.local(id) + if err != nil { + // We don't make an exception here for ACLs that aren't + // found locally. It seems more robust to use an expired + // cached entry (if we have one) rather than ignore it + // for the case that replication was a bit behind and + // didn't have the ACL yet. + c.logger.Printf("[DEBUG] consul.acl: Failed to get policy from replicated ACLs: %v", err) + goto ACL_DOWN + } + + policy, err := acl.Parse(rules) + if err != nil { + c.logger.Printf("[DEBUG] consul.acl: Failed to parse policy for replicated ACL: %v", err) + goto ACL_DOWN + } + policy.ID = acl.RuleID(rules) + + // Fake up an ACL datacenter reply and inject it into the cache. + // Note we use the local TTL here, so this'll be used for that + // amount of time even once the ACL datacenter becomes available. + metrics.IncrCounter([]string{"consul", "acl", "replication_hit"}, 1) + reply.ETag = makeACLETag(parent, policy) + reply.TTL = c.config.ACLTTL + reply.Parent = parent + reply.Policy = policy + return c.useACLPolicy(id, authDC, cached, &reply) + } + +ACL_DOWN: + // Unable to refresh, apply the down policy. switch c.config.ACLDownPolicy { case "allow": return acl.AllowAll(), nil diff --git a/consul/acl_endpoint.go b/consul/acl_endpoint.go index d14c90d289..19776ac5ea 100644 --- a/consul/acl_endpoint.go +++ b/consul/acl_endpoint.go @@ -161,6 +161,11 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest, }) } +// makeACLETag returns an ETag for the given parent and policy. +func makeACLETag(parent string, policy *acl.Policy) string { + return fmt.Sprintf("%s:%s", parent, policy.ID) +} + // GetPolicy is used to retrieve a compiled policy object with a TTL. Does not // support a blocking query. func (a *ACL) GetPolicy(args *structs.ACLPolicyRequest, reply *structs.ACLPolicy) error { @@ -181,7 +186,7 @@ func (a *ACL) GetPolicy(args *structs.ACLPolicyRequest, reply *structs.ACLPolicy // Generate an ETag conf := a.srv.config - etag := fmt.Sprintf("%s:%s", parent, policy.ID) + etag := makeACLETag(parent, policy) // Setup the response reply.ETag = etag diff --git a/consul/acl_replication.go b/consul/acl_replication.go index af4aebffb7..ca530453f7 100644 --- a/consul/acl_replication.go +++ b/consul/acl_replication.go @@ -179,6 +179,7 @@ func (s *Server) updateLocalACLs(changes structs.ACLRequests) error { time.Sleep(1*time.Second - elapsed) ops, start = 0, time.Now() } + ops++ // Note that we are using the single ACL interface here and not // performing all this inside a single transaction. This is OK @@ -192,7 +193,6 @@ func (s *Server) updateLocalACLs(changes structs.ACLRequests) error { if err := aclApplyInternal(s, change, &reply); err != nil { return err } - ops++ } return nil } @@ -232,6 +232,8 @@ func (s *Server) replicateACLs(lastRemoteIndex uint64) (uint64, error) { lastRemoteIndex = 0 } + // Calculate the changes required to bring the state into sync and then + // apply them. changes := reconcileACLs(local, remote.ACLs, lastRemoteIndex) if err := s.updateLocalACLs(changes); err != nil { return 0, fmt.Errorf("failed to sync ACL changes: %v", err) @@ -252,8 +254,6 @@ func (s *Server) IsACLReplicationEnabled() bool { // runACLReplication is a long-running goroutine that will attempt to replicate // ACLs while the server is the leader, until the shutdown channel closes. func (s *Server) runACLReplication() { - defer s.shutdownWait.Done() - // Give each server's replicator a random initial phase for good // measure. select { diff --git a/consul/server.go b/consul/server.go index 02490c6ae0..5419f0b6ee 100644 --- a/consul/server.go +++ b/consul/server.go @@ -67,7 +67,7 @@ const ( // Server is Consul server which manages the service discovery, // health checking, DC forwarding, Raft, and multiple Serf pools. type Server struct { - // aclAuthCache is the authoritative ACL cache + // aclAuthCache is the authoritative ACL cache. aclAuthCache *acl.Cache // aclCache is the non-authoritative ACL cache. @@ -151,15 +151,10 @@ type Server struct { // shutdown and the associated members here are used in orchestrating // a clean shutdown. The shutdownCh is never written to, only closed to - // indicate a shutdown has been initiated. The shutdownWait group will - // be waited on after closing the shutdownCh, but before any other - // shutdown activities take place in the server. Anything added to the - // shutdownWait group will block all the rest of shutdown, so use this - // sparingly and carefully. + // indicate a shutdown has been initiated. shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex - shutdownWait sync.WaitGroup } // Holds the RPC endpoints @@ -236,16 +231,21 @@ func NewServer(config *Config) (*Server, error) { } // Initialize the authoritative ACL cache. - s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclFault) + s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault) if err != nil { s.Shutdown() - return nil, fmt.Errorf("Failed to create ACL cache: %v", err) + return nil, fmt.Errorf("Failed to create authoritative ACL cache: %v", err) } - // Set up the non-authoritative ACL cache. - if s.aclCache, err = newAclCache(config, logger, s.RPC); err != nil { + // Set up the non-authoritative ACL cache. A nil local function is given + // if ACL replication isn't enabled. + var local acl.FaultFunc + if s.IsACLReplicationEnabled() { + local = s.aclLocalFault + } + if s.aclCache, err = newAclCache(config, logger, s.RPC, local); err != nil { s.Shutdown() - return nil, err + return nil, fmt.Errorf("Failed to create non-authoritative ACL cache: %v", err) } // Initialize the RPC layer. @@ -280,7 +280,6 @@ func NewServer(config *Config) (*Server, error) { // Start ACL replication. if s.IsACLReplicationEnabled() { - s.shutdownWait.Add(1) go s.runACLReplication() } @@ -509,7 +508,6 @@ func (s *Server) Shutdown() error { s.shutdown = true close(s.shutdownCh) - s.shutdownWait.Wait() if s.serfLAN != nil { s.serfLAN.Shutdown() From 096d5ff3bbece73f44937fd4f11ba4d89cb8a09f Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 3 Aug 2016 18:44:24 -0700 Subject: [PATCH 04/18] Adds tests for the ACL reconcile algorithm. --- consul/acl_replication.go | 11 +- consul/acl_replication_test.go | 217 +++++++++++++++++++++++++++++++++ 2 files changed, 225 insertions(+), 3 deletions(-) create mode 100644 consul/acl_replication_test.go diff --git a/consul/acl_replication.go b/consul/acl_replication.go index ca530453f7..b7e1609309 100644 --- a/consul/acl_replication.go +++ b/consul/acl_replication.go @@ -41,13 +41,18 @@ type aclIterator struct { index int } +// newACLIterator returns a new ACL iterator. +func newACLIterator(acls structs.ACLs) *aclIterator { + return &aclIterator{acls: acls} +} + // Front returns the item at index position, or nil if the list is exhausted. func (a *aclIterator) Front() *structs.ACL { if a.index < len(a.acls) { return a.acls[a.index] + } else { + return nil } - - return nil } // Next advances the iterator to the next index. @@ -72,7 +77,7 @@ func reconcileACLs(local, remote structs.ACLs, lastRemoteIndex uint64) structs.A // Run through both lists and reconcile them. var changes structs.ACLRequests - localIter, remoteIter := &aclIterator{local, 0}, &aclIterator{remote, 0} + localIter, remoteIter := newACLIterator(local), newACLIterator(remote) for localIter.Front() != nil || remoteIter.Front() != nil { // If the local list is exhausted, then process this as a remote // add. We know from the loop condition that there's something diff --git a/consul/acl_replication_test.go b/consul/acl_replication_test.go new file mode 100644 index 0000000000..eaa9f0cee9 --- /dev/null +++ b/consul/acl_replication_test.go @@ -0,0 +1,217 @@ +package consul + +import ( + "fmt" + "reflect" + "sort" + "strconv" + "strings" + "testing" + + "github.com/hashicorp/consul/consul/structs" +) + +func TestACLReplication_Sorter(t *testing.T) { + acls := structs.ACLs{ + &structs.ACL{ID: "a"}, + &structs.ACL{ID: "b"}, + &structs.ACL{ID: "c"}, + } + + sorter := &aclIDSorter{acls} + if len := sorter.Len(); len != 3 { + t.Fatalf("bad: %d", len) + } + if !sorter.Less(0, 1) { + t.Fatalf("should be less") + } + if sorter.Less(1, 0) { + t.Fatalf("should not be less") + } + if !sort.IsSorted(sorter) { + t.Fatalf("should be sorted") + } + + expected := structs.ACLs{ + &structs.ACL{ID: "b"}, + &structs.ACL{ID: "a"}, + &structs.ACL{ID: "c"}, + } + sorter.Swap(0, 1) + if !reflect.DeepEqual(acls, expected) { + t.Fatalf("bad: %v", acls) + } + if sort.IsSorted(sorter) { + t.Fatalf("should not be sorted") + } + sort.Sort(sorter) + if !sort.IsSorted(sorter) { + t.Fatalf("should be sorted") + } +} + +func TestACLReplication_Iterator(t *testing.T) { + acls := structs.ACLs{} + + iter := newACLIterator(acls) + if front := iter.Front(); front != nil { + t.Fatalf("bad: %v", front) + } + iter.Next() + if front := iter.Front(); front != nil { + t.Fatalf("bad: %v", front) + } + + acls = structs.ACLs{ + &structs.ACL{ID: "a"}, + &structs.ACL{ID: "b"}, + &structs.ACL{ID: "c"}, + } + iter = newACLIterator(acls) + if front := iter.Front(); front != acls[0] { + t.Fatalf("bad: %v", front) + } + iter.Next() + if front := iter.Front(); front != acls[1] { + t.Fatalf("bad: %v", front) + } + iter.Next() + if front := iter.Front(); front != acls[2] { + t.Fatalf("bad: %v", front) + } + iter.Next() + if front := iter.Front(); front != nil { + t.Fatalf("bad: %v", front) + } +} + +func TestACLReplication_reconcileACLs(t *testing.T) { + parseACLs := func(raw string) structs.ACLs { + var acls structs.ACLs + for _, key := range strings.Split(raw, "|") { + if len(key) == 0 { + continue + } + + tuple := strings.Split(key, ":") + index, err := strconv.Atoi(tuple[1]) + if err != nil { + t.Fatalf("err: %v", err) + } + acl := &structs.ACL{ + ID: tuple[0], + Rules: tuple[2], + RaftIndex: structs.RaftIndex{ + ModifyIndex: uint64(index), + }, + } + acls = append(acls, acl) + } + return acls + } + + parseChanges := func(changes structs.ACLRequests) string { + var ret string + for i, change := range changes { + if i > 0 { + ret += "|" + } + ret += fmt.Sprintf("%s:%s:%s", change.Op, change.ACL.ID, change.ACL.Rules) + } + return ret + } + + tests := []struct { + local string + remote string + lastRemoteIndex uint64 + expected string + }{ + // Everything empty. + { + local: "", + remote: "", + lastRemoteIndex: 0, + expected: "", + }, + // First time with empty local. + { + local: "", + remote: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X", + lastRemoteIndex: 0, + expected: "set:bbb:X|set:ccc:X|set:ddd:X|set:eee:X", + }, + // Remote not sorted. + { + local: "", + remote: "ddd:2:X|bbb:3:X|ccc:9:X|eee:11:X", + lastRemoteIndex: 0, + expected: "set:bbb:X|set:ccc:X|set:ddd:X|set:eee:X", + }, + // Neither side sorted. + { + local: "ddd:2:X|bbb:3:X|ccc:9:X|eee:11:X", + remote: "ccc:9:X|bbb:3:X|ddd:2:X|eee:11:X", + lastRemoteIndex: 0, + expected: "", + }, + // Fully replicated, nothing to do. + { + local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X", + remote: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X", + lastRemoteIndex: 0, + expected: "", + }, + // Change an ACL. + { + local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X", + remote: "bbb:3:X|ccc:33:Y|ddd:2:X|eee:11:X", + lastRemoteIndex: 0, + expected: "set:ccc:Y", + }, + // Change an ACL, but mask the change by the last replicated + // index. This isn't how things work normally, but it proves + // we are skipping the full compare based on the index. + { + local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X", + remote: "bbb:3:X|ccc:33:Y|ddd:2:X|eee:11:X", + lastRemoteIndex: 33, + expected: "", + }, + // Empty everything out. + { + local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X", + remote: "", + lastRemoteIndex: 0, + expected: "delete:bbb:X|delete:ccc:X|delete:ddd:X|delete:eee:X", + }, + // Adds on the ends and in the middle. + { + local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X", + remote: "aaa:99:X|bbb:3:X|ccc:9:X|ccx:101:X|ddd:2:X|eee:11:X|fff:102:X", + lastRemoteIndex: 0, + expected: "set:aaa:X|set:ccx:X|set:fff:X", + }, + // Deletes on the ends and in the middle. + { + local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X", + remote: "ccc:9:X", + lastRemoteIndex: 0, + expected: "delete:bbb:X|delete:ddd:X|delete:eee:X", + }, + // Everything. + { + local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X", + remote: "aaa:99:X|bbb:3:X|ccx:101:X|ddd:103:Y|eee:11:X|fff:102:X", + lastRemoteIndex: 0, + expected: "set:aaa:X|delete:ccc:X|set:ccx:X|set:ddd:Y|set:fff:X", + }, + } + for i, test := range tests { + local, remote := parseACLs(test.local), parseACLs(test.remote) + changes := reconcileACLs(local, remote, test.lastRemoteIndex) + if actual := parseChanges(changes); actual != test.expected { + t.Errorf("test case %d failed: %s", i, actual) + } + } +} From f44bc7e97a5bb486b7e42b13f96745ca386c7434 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 4 Aug 2016 07:46:59 -0700 Subject: [PATCH 05/18] Removes a TODO comment. Decided we don't need to log anything about the token here. If the token is not valid then the client will get an error about that, so anything that can happen here is related to talking to the server in the ACL datacenter, so not specific to the token. --- consul/acl.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/consul/acl.go b/consul/acl.go index 7a472134fb..a4da8211cf 100644 --- a/consul/acl.go +++ b/consul/acl.go @@ -194,11 +194,6 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) { if strings.Contains(err.Error(), aclNotFound) { return nil, errors.New(aclNotFound) } else { - // TODO (slackpad) - We used to print a few characters of the - // token here if the token was long enough. This was bugging me - // so I deleted it. We should probably print a hash of the token, - // or better yet let's add another ID to tokens to identify them - // without giving away their privileges. c.logger.Printf("[ERR] consul.acl: Failed to get policy from ACL datacenter: %v", err) } From 0a9060bb84121edbe09189678089c1d3bc61bd6f Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 4 Aug 2016 16:33:40 -0700 Subject: [PATCH 06/18] Adds remaining core replication tests. --- consul/acl.go | 12 ++ consul/acl_replication.go | 2 +- consul/acl_replication_test.go | 217 ++++++++++++++++++++++++++++++++- 3 files changed, 229 insertions(+), 2 deletions(-) diff --git a/consul/acl.go b/consul/acl.go index a4da8211cf..e583df5c8b 100644 --- a/consul/acl.go +++ b/consul/acl.go @@ -197,6 +197,18 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) { c.logger.Printf("[ERR] consul.acl: Failed to get policy from ACL datacenter: %v", err) } + // TODO (slackpad) - We could do a similar thing *within* the ACL + // datacenter if the leader isn't available. We have a local state + // store of the ACLs, so by populating the local member in this cache, + // it would fall back to the state store if there was a leader loss and + // the extend-cache policy was true. This feels subtle to explain and + // configure, and leader blips should be paved over by cache already, so + // we won't do this for now but should consider for the future. This is + // a lot different than the replication story where you might be cut off + // from the ACL datacenter for an extended period of time and need to + // carry on operating with the full set of ACLs as they were known + // before the partition. + // At this point we might have an expired cache entry and we know that // there was a problem getting the ACL from the ACL datacenter. If a // local ACL fault function is registered to query replicated ACL data, diff --git a/consul/acl_replication.go b/consul/acl_replication.go index b7e1609309..075a6b46dc 100644 --- a/consul/acl_replication.go +++ b/consul/acl_replication.go @@ -179,7 +179,7 @@ func (s *Server) updateLocalACLs(changes structs.ACLRequests) error { // operations and wait out to the second before we continue. If // it's going slower than that, the sleep time will be negative // so we will just keep going without delay. - if ops > s.config.ACLReplicationApplyLimit { + if ops >= s.config.ACLReplicationApplyLimit { elapsed := time.Now().Sub(start) time.Sleep(1*time.Second - elapsed) ops, start = 0, time.Now() diff --git a/consul/acl_replication_test.go b/consul/acl_replication_test.go index eaa9f0cee9..3f9b854f26 100644 --- a/consul/acl_replication_test.go +++ b/consul/acl_replication_test.go @@ -2,13 +2,16 @@ package consul import ( "fmt" + "os" "reflect" "sort" "strconv" "strings" "testing" + "time" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" ) func TestACLReplication_Sorter(t *testing.T) { @@ -203,7 +206,7 @@ func TestACLReplication_reconcileACLs(t *testing.T) { { local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X", remote: "aaa:99:X|bbb:3:X|ccx:101:X|ddd:103:Y|eee:11:X|fff:102:X", - lastRemoteIndex: 0, + lastRemoteIndex: 11, expected: "set:aaa:X|delete:ccc:X|set:ccx:X|set:ddd:Y|set:fff:X", }, } @@ -215,3 +218,215 @@ func TestACLReplication_reconcileACLs(t *testing.T) { } } } + +func TestACLReplication_updateLocalACLs_RateLimit(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.ACLDatacenter = "dc1" + c.ACLReplicationToken = "secret" + c.ACLReplicationApplyLimit = 1 + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC, "dc2") + + changes := structs.ACLRequests{ + &structs.ACLRequest{ + Op: structs.ACLSet, + ACL: structs.ACL{ + ID: "secret", + Type: "client", + }, + }, + } + + // Under the limit, should be quick. + start := time.Now() + if err := s1.updateLocalACLs(changes); err != nil { + t.Fatalf("err: %v", err) + } + if dur := time.Now().Sub(start); dur > 500*time.Millisecond { + t.Fatalf("too slow: %9.6f", dur.Seconds()) + } + + changes = append(changes, + &structs.ACLRequest{ + Op: structs.ACLSet, + ACL: structs.ACL{ + ID: "secret", + Type: "client", + }, + }) + + // Over the limit, should be throttled. + start = time.Now() + if err := s1.updateLocalACLs(changes); err != nil { + t.Fatalf("err: %v", err) + } + if dur := time.Now().Sub(start); dur < 500*time.Millisecond { + t.Fatalf("too fast: %9.6f", dur.Seconds()) + } +} + +func TestACLReplication_IsACLReplicationEnabled(t *testing.T) { + // ACLs not enabled. + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + if s1.IsACLReplicationEnabled() { + t.Fatalf("should not be enabled") + } + + // ACLs enabled but not replication. + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.ACLDatacenter = "dc1" + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + if s2.IsACLReplicationEnabled() { + t.Fatalf("should not be enabled") + } + + // ACLs enabled with replication. + dir3, s3 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.ACLDatacenter = "dc1" + c.ACLReplicationToken = "secret" + }) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + if !s3.IsACLReplicationEnabled() { + t.Fatalf("should be enabled") + } + + // ACLs enabled and replication token set, but inside the ACL datacenter + // so replication should be disabled. + dir4, s4 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.ACLDatacenter = "dc1" + c.ACLReplicationToken = "secret" + }) + defer os.RemoveAll(dir4) + defer s4.Shutdown() + if s4.IsACLReplicationEnabled() { + t.Fatalf("should not be enabled") + } +} + +func TestACLReplication(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.ACLDatacenter = "dc1" + c.ACLReplicationToken = "root" + c.ACLReplicationInterval = 0 + c.ACLReplicationApplyLimit = 1000000 + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Try to join. + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfWANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinWAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + testutil.WaitForLeader(t, s1.RPC, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc2") + + // Create a bunch of new tokens. + var id string + for i := 0; i < 1000; i++ { + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: testACLPolicy, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + if err := s1.RPC("ACL.Apply", &arg, &id); err != nil { + t.Fatalf("err: %v", err) + } + } + + checkSame := func() (bool, error) { + _, remote, err := s1.fsm.State().ACLList() + if err != nil { + return false, err + } + _, local, err := s2.fsm.State().ACLList() + if err != nil { + return false, err + } + if len(remote) != len(local) { + return false, nil + } + for i, acl := range remote { + if !acl.IsSame(local[i]) { + return false, nil + } + } + return true, nil + } + + // Wait for the replica to converge. + testutil.WaitForResult(checkSame, func(err error) { + t.Fatalf("ACLs didn't converge") + }) + + // Create more new tokens. + for i := 0; i < 1000; i++ { + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: testACLPolicy, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var dontCare string + if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Wait for the replica to converge. + testutil.WaitForResult(checkSame, func(err error) { + t.Fatalf("ACLs didn't converge") + }) + + // Delete a token. + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLDelete, + ACL: structs.ACL{ + ID: id, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var dontCare string + if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for the replica to converge. + testutil.WaitForResult(checkSame, func(err error) { + t.Fatalf("ACLs didn't converge") + }) +} From c54269882b90b0cdb436820d20d108ea7e48075c Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 4 Aug 2016 17:59:08 -0700 Subject: [PATCH 07/18] Adds a full integrated test for ACL replication. --- consul/acl_test.go | 122 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/consul/acl_test.go b/consul/acl_test.go index c738d7e3fe..323ebc6f82 100644 --- a/consul/acl_test.go +++ b/consul/acl_test.go @@ -614,6 +614,128 @@ func TestACL_DownPolicy_ExtendCache(t *testing.T) { } } +func TestACL_Replication(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.ACLDatacenter = "dc1" + c.ACLDefaultPolicy = "deny" + c.ACLDownPolicy = "extend-cache" + c.ACLReplicationToken = "root" + c.ACLReplicationInterval = 0 + c.ACLReplicationApplyLimit = 1000000 + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc3" + c.ACLDatacenter = "dc1" + c.ACLDownPolicy = "deny" + c.ACLReplicationToken = "root" + c.ACLReplicationInterval = 0 + c.ACLReplicationApplyLimit = 1000000 + }) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + // Try to join. + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfWANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinWAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if _, err := s3.JoinWAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + testutil.WaitForLeader(t, s1.RPC, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc2") + testutil.WaitForLeader(t, s1.RPC, "dc3") + + // Create a new token. + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: testACLPolicy, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var id string + if err := s1.RPC("ACL.Apply", &arg, &id); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for replication to occur. + testutil.WaitForResult(func() (bool, error) { + _, acl, err := s2.fsm.State().ACLGet(id) + if err != nil { + return false, err + } + if acl == nil { + return false, nil + } + _, acl, err = s3.fsm.State().ACLGet(id) + if err != nil { + return false, err + } + if acl == nil { + return false, nil + } + return true, nil + }, func(err error) { + t.Fatalf("ACLs didn't converge") + }) + + // Kill the ACL datacenter. + s1.Shutdown() + + // Token should resolve on s2, which has replication + extend-cache. + acl, err := s2.resolveToken(id) + if err != nil { + t.Fatalf("err: %v", err) + } + if acl == nil { + t.Fatalf("missing acl") + } + + // Check the policy + if acl.KeyRead("bar") { + t.Fatalf("unexpected read") + } + if !acl.KeyRead("foo/test") { + t.Fatalf("unexpected failed read") + } + + // Although s3 has replication, and we verified that the ACL is there, + // it can not be used because of the down policy. + acl, err = s3.resolveToken(id) + if err != nil { + t.Fatalf("err: %v", err) + } + if acl == nil { + t.Fatalf("missing acl") + } + + // Check the policy. + if acl.KeyRead("bar") { + t.Fatalf("unexpected read") + } + if acl.KeyRead("foo/test") { + t.Fatalf("unexpected read") + } +} + func TestACL_MultiDC_Found(t *testing.T) { dir1, s1 := testServerWithConfig(t, func(c *Config) { c.ACLDatacenter = "dc1" From 734cc0b3d5e616cb1f02db72c687445b3551069d Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 4 Aug 2016 18:03:07 -0700 Subject: [PATCH 08/18] Increases the ACL cache size to 10k. --- consul/acl.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consul/acl.go b/consul/acl.go index e583df5c8b..df8846698f 100644 --- a/consul/acl.go +++ b/consul/acl.go @@ -37,7 +37,7 @@ const ( redactedToken = "" // Maximum number of cached ACL entries - aclCacheSize = 256 + aclCacheSize = 10 * 1024 ) var ( From d29af2ddc79712547d13d9a11261104efb94b89e Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 4 Aug 2016 21:32:36 -0700 Subject: [PATCH 09/18] Adds an ACL replication status endpoint. --- command/agent/acl_endpoint.go | 17 ++++++++++ command/agent/acl_endpoint_test.go | 15 +++++++++ command/agent/http.go | 2 ++ consul/acl_endpoint.go | 22 +++++++++++++ consul/acl_endpoint_test.go | 26 +++++++++++++++ consul/acl_replication.go | 53 +++++++++++++++++++++++++----- consul/acl_replication_test.go | 13 +++++++- consul/server.go | 6 ++++ consul/structs/structs.go | 11 +++++++ 9 files changed, 156 insertions(+), 9 deletions(-) diff --git a/command/agent/acl_endpoint.go b/command/agent/acl_endpoint.go index 3bc054611b..b60502ce99 100644 --- a/command/agent/acl_endpoint.go +++ b/command/agent/acl_endpoint.go @@ -205,3 +205,20 @@ func (s *HTTPServer) ACLList(resp http.ResponseWriter, req *http.Request) (inter } return out.ACLs, nil } + +func (s *HTTPServer) ACLReplicationStatus(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Note that we do not forward to the ACL DC here. This is a query for + // any DC that's doing replication. + args := structs.DCSpecificRequest{} + s.parseSource(req, &args.Source) + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } + + // Make the request. + var out structs.ACLReplicationStatus + if err := s.agent.RPC("ACL.ReplicationStatus", &args, &out); err != nil { + return nil, err + } + return out, nil +} diff --git a/command/agent/acl_endpoint_test.go b/command/agent/acl_endpoint_test.go index 361d661ac6..836d5cd19f 100644 --- a/command/agent/acl_endpoint_test.go +++ b/command/agent/acl_endpoint_test.go @@ -218,3 +218,18 @@ func TestACLList(t *testing.T) { } }) } + +func TestACLReplicationStatus(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + req, err := http.NewRequest("GET", "/v1/acl/replication", nil) + resp := httptest.NewRecorder() + obj, err := srv.ACLReplicationStatus(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + _, ok := obj.(structs.ACLReplicationStatus) + if !ok { + t.Fatalf("should work") + } + }) +} diff --git a/command/agent/http.go b/command/agent/http.go index 92247ef2c8..a0fa287b9a 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -257,6 +257,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/acl/info/", s.wrap(s.ACLGet)) s.mux.HandleFunc("/v1/acl/clone/", s.wrap(s.ACLClone)) s.mux.HandleFunc("/v1/acl/list", s.wrap(s.ACLList)) + s.mux.HandleFunc("/v1/acl/replication", s.wrap(s.ACLReplicationStatus)) } else { s.mux.HandleFunc("/v1/acl/create", s.wrap(aclDisabled)) s.mux.HandleFunc("/v1/acl/update", s.wrap(aclDisabled)) @@ -264,6 +265,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/acl/info/", s.wrap(aclDisabled)) s.mux.HandleFunc("/v1/acl/clone/", s.wrap(aclDisabled)) s.mux.HandleFunc("/v1/acl/list", s.wrap(aclDisabled)) + s.mux.HandleFunc("/v1/acl/replication", s.wrap(aclDisabled)) } s.mux.HandleFunc("/v1/query", s.wrap(s.PreparedQueryGeneral)) diff --git a/consul/acl_endpoint.go b/consul/acl_endpoint.go index 19776ac5ea..52cb5d228f 100644 --- a/consul/acl_endpoint.go +++ b/consul/acl_endpoint.go @@ -235,3 +235,25 @@ func (a *ACL) List(args *structs.DCSpecificRequest, return nil }) } + +// ReplicationStatus is used to retrieve the current ACL replication status. +func (a *ACL) ReplicationStatus(args *structs.DCSpecificRequest, + reply *structs.ACLReplicationStatus) error { + // This must be sent to the leader, so we fix the args since we are + // re-using a structure where we don't support all the options. + args.RequireConsistent = true + args.AllowStale = false + if done, err := a.srv.forward("ACL.ReplicationStatus", args, args, reply); done { + return err + } + + // There's no ACL token required here since this doesn't leak any + // sensitive information, and we don't want people to have to use + // management tokens if they are querying this via a health check. + + // Poll the latest status. + a.srv.aclReplicationStatusLock.RLock() + *reply = a.srv.aclReplicationStatus + a.srv.aclReplicationStatusLock.RUnlock() + return nil +} diff --git a/consul/acl_endpoint_test.go b/consul/acl_endpoint_test.go index 9871dc1ad1..e031730bf6 100644 --- a/consul/acl_endpoint_test.go +++ b/consul/acl_endpoint_test.go @@ -466,3 +466,29 @@ func TestACLEndpoint_List_Denied(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestACLEndpoint_ReplicationStatus(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc2" + c.ACLReplicationToken = "secret" + c.ACLReplicationInterval = 0 + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + getR := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var status structs.ACLReplicationStatus + err := msgpackrpc.CallWithCodec(codec, "ACL.ReplicationStatus", &getR, &status) + if err != nil { + t.Fatalf("err: %v", err) + } + if !status.Enabled || !status.Running || status.SourceDatacenter != "dc2" { + t.Fatalf("bad: %#v", status) + } +} diff --git a/consul/acl_replication.go b/consul/acl_replication.go index 075a6b46dc..c7a5f3dd13 100644 --- a/consul/acl_replication.go +++ b/consul/acl_replication.go @@ -256,9 +256,33 @@ func (s *Server) IsACLReplicationEnabled() bool { len(s.config.ACLReplicationToken) > 0 } +// updateACLReplicationStatus safely updates the ACL replication status. +func (s *Server) updateACLReplicationStatus(status structs.ACLReplicationStatus) { + // Fixup the times to shed some useless precision to ease formattting, + // and always report UTC. + status.LastError = status.LastError.Round(time.Second).UTC() + status.LastSuccess = status.LastSuccess.Round(time.Second).UTC() + + // Set the shared state. + s.aclReplicationStatusLock.Lock() + s.aclReplicationStatus = status + s.aclReplicationStatusLock.Unlock() +} + // runACLReplication is a long-running goroutine that will attempt to replicate // ACLs while the server is the leader, until the shutdown channel closes. func (s *Server) runACLReplication() { + var status structs.ACLReplicationStatus + status.Enabled = true + status.SourceDatacenter = s.config.ACLDatacenter + s.updateACLReplicationStatus(status) + + // Show that it's not running on the way out. + defer func() { + status.Running = false + s.updateACLReplicationStatus(status) + }() + // Give each server's replicator a random initial phase for good // measure. select { @@ -266,26 +290,39 @@ func (s *Server) runACLReplication() { case <-s.shutdownCh: } + // We are fairly conservative with the lastRemoteIndex so that after a + // leadership change or an error we re-sync everything (we also don't + // want to block the first time after one of these events so we can + // show a successful sync in the status endpoint). var lastRemoteIndex uint64 - var wasActive bool replicate := func() { - if !wasActive { + if !status.Running { + lastRemoteIndex = 0 // Re-sync everything. + status.Running = true + s.updateACLReplicationStatus(status) s.logger.Printf("[INFO] consul: ACL replication started") - wasActive = true } - var err error - lastRemoteIndex, err = s.replicateACLs(lastRemoteIndex) + index, err := s.replicateACLs(lastRemoteIndex) if err != nil { + lastRemoteIndex = 0 // Re-sync everything. + status.LastError = time.Now() + s.updateACLReplicationStatus(status) s.logger.Printf("[WARN] consul: ACL replication error (will retry if still leader): %v", err) } else { - s.logger.Printf("[DEBUG] consul: ACL replication completed through index %d", lastRemoteIndex) + lastRemoteIndex = index + status.ReplicatedIndex = index + status.LastSuccess = time.Now() + s.updateACLReplicationStatus(status) + s.logger.Printf("[DEBUG] consul: ACL replication completed through index %d", index) } } pause := func() { - if wasActive { + if status.Running { + lastRemoteIndex = 0 // Re-sync everything. + status.Running = false + s.updateACLReplicationStatus(status) s.logger.Printf("[INFO] consul: ACL replication stopped (no longer leader)") - wasActive = false } } diff --git a/consul/acl_replication_test.go b/consul/acl_replication_test.go index 3f9b854f26..0f60fc69a8 100644 --- a/consul/acl_replication_test.go +++ b/consul/acl_replication_test.go @@ -364,7 +364,7 @@ func TestACLReplication(t *testing.T) { } checkSame := func() (bool, error) { - _, remote, err := s1.fsm.State().ACLList() + index, remote, err := s1.fsm.State().ACLList() if err != nil { return false, err } @@ -380,6 +380,17 @@ func TestACLReplication(t *testing.T) { return false, nil } } + + var status structs.ACLReplicationStatus + s2.aclReplicationStatusLock.RLock() + status = s2.aclReplicationStatus + s2.aclReplicationStatusLock.RUnlock() + if !status.Enabled || !status.Running || + status.ReplicatedIndex != index || + status.SourceDatacenter != "dc1" { + return false, nil + } + return true, nil } diff --git a/consul/server.go b/consul/server.go index 5419f0b6ee..e31b928453 100644 --- a/consul/server.go +++ b/consul/server.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/consul/state" + "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/raft" "github.com/hashicorp/raft-boltdb" @@ -149,6 +150,11 @@ type Server struct { // for the KV tombstones tombstoneGC *state.TombstoneGC + // aclReplicationStatus (and its associated lock) provide information + // about the health of the ACL replication goroutine. + aclReplicationStatus structs.ACLReplicationStatus + aclReplicationStatusLock sync.RWMutex + // shutdown and the associated members here are used in orchestrating // a clean shutdown. The shutdownCh is never written to, only closed to // indicate a shutdown has been initiated. diff --git a/consul/structs/structs.go b/consul/structs/structs.go index fdd36ea2b5..c9ee486fc1 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -748,6 +748,17 @@ type ACLPolicy struct { QueryMeta } +// ACLReplicationStatus provides information about the health of the ACL +// replication system. +type ACLReplicationStatus struct { + Enabled bool + Running bool + SourceDatacenter string + ReplicatedIndex uint64 + LastSuccess time.Time + LastError time.Time +} + // Coordinate stores a node name with its associated network coordinate. type Coordinate struct { Node string From c499c4e7d32fd0c8482c50cb71cb690d9a897655 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Fri, 5 Aug 2016 00:23:28 -0700 Subject: [PATCH 10/18] Updates documentation for ACL replication. --- .../source/docs/agent/http/acl.html.markdown | 54 ++++ .../source/docs/agent/options.html.markdown | 11 + .../source/docs/internals/acl.html.markdown | 270 ++++++++++-------- 3 files changed, 222 insertions(+), 113 deletions(-) diff --git a/website/source/docs/agent/http/acl.html.markdown b/website/source/docs/agent/http/acl.html.markdown index b6b30c5758..941d1b6be6 100644 --- a/website/source/docs/agent/http/acl.html.markdown +++ b/website/source/docs/agent/http/acl.html.markdown @@ -17,6 +17,7 @@ The following endpoints are supported: * [`/v1/acl/info/`](#acl_info): Queries the policy of a given token * [`/v1/acl/clone/`](#acl_clone): Creates a new token by cloning an existing token * [`/v1/acl/list`](#acl_list): Lists all the active tokens +* [`/v1/acl/replication`](#acl_replication_status): Checks status of ACL replication ### /v1/acl/create @@ -166,3 +167,56 @@ It returns a JSON body like this: ... ] ``` + +### /v1/acl/replication + +The endpoint must be hit with a GET. It returns the status of the +[ACL replication](/docs/internals/acl.html#replication) process in +the datacenter. This is intended to be used by operators, or by +automation checking the health of ACL replication. + +By default, the datacenter of the agent is queried; however, the dc can be provided +using the "?dc=" query parameter. + +It returns a JSON body like this: + +```javascript +{ + "Enabled": true, + "Running": true, + "SourceDatacenter": "dc1", + "ReplicatedIndex": 1976, + "LastSuccess": "2016-08-05T06:28:58Z", + "LastError": "2016-08-05T06:28:28Z" +} +``` + +`Enabled` reports whether ACL replication is enabled for the datacenter. + +`Running` reports whether the ACL replication process is running. The process +may take approximately 60 seconds to begin running after a leader election occurs. + +`SourceDatacenter` is the authoritative ACL datacenter that ACLs are being +replicated from, and will match the +[`acl_datacenter`](/docs/agent/options.html#acl_datacenter) configuration. + +`ReplicatedIndex` is the last index that was successfully replicated. You can +compare this to the `X-Consul-Index` header returned by the [`/v1/acl/list`](#acl_list) +endpoint to determine if the replication process has gotten all available +ACLs. Note that replication runs as a background process approximately every 30 +seconds, and that local updates are rate limited to 100 update/second, so so it +may take several minutes to perform the initial sync of a large set of ACLs. +After the initial sync, replica lag should be on the order of about 30 seconds. + +`LastSuccess` is the UTC time of the last successful sync operation. Note that +since ACL replication is done with a blocking query, this may not update for up +to 5 minutes if there have been no ACL changes to replicate. A zero value of +"0001-01-01T00:00:00Z" will be present if no sync has been successful. + +`LastError` is the UTC time of the last error encountered during a sync operation. +If this time is later than `LastSuccess`, you can assume the replication process +is not in a good state. A zero value of "0001-01-01T00:00:00Z" will be present if +no sync has resulted in an error. + +Please see the [ACL replication](/docs/internals/acl.html#replication) +section of the internals guide for more details. diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index 91e031e3df..354c69c1db 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -351,6 +351,17 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass token. When you provide a value, it can be any string value. Using a UUID would ensure that it looks the same as the other tokens, but isn't strictly necessary. +* `acl_replication_token` - + Only used for servers outside the [`acl_datacenter`](#acl_datacenter) running Consul 0.7 or later. + When provided, this will enable [ACL replication](/docs/internals/acl.html#replication) using this + token to retrieve and replicate the ACLs to the non-authoritative local datacenter. +

+ If there's a partition or other outage affecting the authoritative datacenter, and the + [`acl_down_policy`](/docs/agent/options.html#acl_down_policy) is set to "extend-cache", tokens not + in the cache can be resolved during the outage using the replicated set of ACLs. Please see the + [ACL replication](/docs/internals/acl.html#replication) section of the internals guide for more + details. + * `acl_token` - When provided, the agent will use this token when making requests to the Consul servers. Clients can override this token on a per-request basis by providing the "?token" query parameter. When not provided, the empty token, which maps to diff --git a/website/source/docs/internals/acl.html.markdown b/website/source/docs/internals/acl.html.markdown index 03311e0b21..210c334cfe 100644 --- a/website/source/docs/internals/acl.html.markdown +++ b/website/source/docs/internals/acl.html.markdown @@ -39,14 +39,24 @@ prior versions do not provide a token. This is handled by the special "anonymous token. If no token is provided, the rules associated with the anonymous token are automatically applied: this allows policy to be enforced on legacy clients. +ACLs can also act in either a whitelist or blacklist mode depending +on the configuration of +[`acl_default_policy`](/docs/agent/options.html#acl_default_policy). If the +default policy is to deny all actions, then token rules can be set to whitelist +specific actions. In the inverse, the allow all default behavior is a blacklist +where rules are used to prohibit actions. By default, Consul will allow all +actions. + +#### ACL Datacenter + Enforcement is always done by the server nodes. All servers must be configured to provide an [`acl_datacenter`](/docs/agent/options.html#acl_datacenter) which -enables ACL enforcement but also specifies the authoritative datacenter. Consul does not -replicate data cross-WAN and instead relies on [RPC forwarding](/docs/internals/architecture.html) -to support Multi-Datacenter configurations. However, because requests can be made +enables ACL enforcement but also specifies the authoritative datacenter. Consul +relies on [RPC forwarding](/docs/internals/architecture.html) to support +Multi-Datacenter configurations. However, because requests can be made across datacenter boundaries, ACL tokens must be valid globally. To avoid -replication issues, a single datacenter is considered authoritative and stores -all the tokens. +consistency issues, a single datacenter is considered authoritative and stores +the canonical set of tokens. When a request is made to a server in a non-authoritative datacenter server, it must be resolved into the appropriate policy. This is done by reading the token @@ -55,7 +65,9 @@ from the authoritative server and caching the result for a configurable of caching is that the cache TTL is an upper bound on the staleness of policy that is enforced. It is possible to set a zero TTL, but this has adverse performance impacts, as every request requires refreshing the policy via a -cross-datacenter WAN call. +cross-datacenter WAN RPC call. + +#### Outages and ACL Replication The Consul ACL system is designed with flexible rules to accommodate for an outage of the [`acl_datacenter`](/docs/agent/options.html#acl_datacenter) or networking @@ -66,114 +78,46 @@ choices to tune behavior. It is possible to deny or permit all actions or to ign cache TTLs and enter a fail-safe mode. The default is to ignore cache TTLs for any previously resolved tokens and to deny any uncached tokens. -ACLs can also act in either a whitelist or blacklist mode depending -on the configuration of -[`acl_default_policy`](/docs/agent/options.html#acl_default_policy). If the -default policy is to deny all actions, then token rules can be set to whitelist -specific actions. In the inverse, the allow all default behavior is a blacklist -where rules are used to prohibit actions. By default, Consul will allow all -actions. + +Consul 0.7 added an ACL Replication capability that can allow non-authoritative +datacenter servers to resolve even uncached tokens. This is enabled by setting an +[`acl_replication_token`](/docs/agent/options.html#acl_replication_token) in the +configuration on the servers in the non-authoritative datacenters. With replication +enabled, the servers will maintain a replica of the authoritative datacenter's full +set of ACLs on the non-authoritative servers. -### Blacklist mode and `consul exec` +Replication occurs with a background process that looks for new ACLs approximately +every 30 seconds. Replicated changes are written at a rate that's throttled to +100 updates/second, so it may take several minutes to perform the initial sync of +a large set of ACLs. -If you set [`acl_default_policy`](/docs/agent/options.html#acl_default_policy) -to `deny`, the `anonymous` token won't have permission to read the default -`_rexec` prefix; therefore, Consul agents using the `anonymous` token -won't be able to perform [`consul exec`](/docs/commands/exec.html) actions. +If there's a partition or other outage affecting the authoritative datacenter, +and the [`acl_down_policy`](/docs/agent/options.html#acl_down_policy) +is set to "extend-cache", tokens will be resolved during the outage using the +replicated set of ACLs. An [ACL replication status](http://localhost:4567/docs/agent/http/acl.html#acl_replication_status) +endpoint is available to monitor the health of the replication process. -Here's why: the agents need read/write permission to the `_rexec` prefix for -[`consul exec`](/docs/commands/exec.html) to work properly. They use that prefix -as the transport for most data. +Locally-resolved ACLs will be cached using the [`acl_ttl`](/docs/agent/options.html#acl_ttl) +setting of the non-authoritative datacenter, so these entries may persist in the +cache for up to the TTL, even after the authoritative datacenter comes back online. -You can enable [`consul exec`](/docs/commands/exec.html) from agents that are not -configured with a token by allowing the `anonymous` token to access that prefix. -This can be done by giving this rule to the `anonymous` token: +ACL replication can also be used to migrate ACLs from one datacenter to another +using a process like this: -```javascript -key "_rexec/" { - policy = "write" -} -``` +1. Enable ACL replication in all datacenters to allow continuation of service +during the migration, and to populate the target datacenter. Verify replication +is healthy and caught up to the current ACL index in the target datacenter +using the [ACL replication status](http://localhost:4567/docs/agent/http/acl.html#acl_replication_status) +endpoint. +2. Turn down the old authoritative datacenter servers. +3. Rolling restart the servers in the target datacenter and change the +`acl_datacenter` configuration to itself. This will automatically turn off +replication and will enable the datacenter to start acting as the authoritative +datacenter, using its replicated ACLs from before. +3. Rolling restart the servers in other datacenters and change their `acl_datacenter` +configuration to the target datacenter. -Alternatively, you can, of course, add an explicit -[`acl_token`](/docs/agent/options.html#acl_token) to each agent, giving it access -to that prefix. - -### Blacklist mode and Service Discovery - -If your [`acl_default_policy`](/docs/agent/options.html#acl_default_policy) is -set to `deny`, the `anonymous` token will be unable to read any service -information. This will cause the service discovery mechanisms in the REST API -and the DNS interface to return no results for any service queries. This is -because internally the API's and DNS interface consume the RPC interface, which -will filter results for services the token has no access to. - -You can allow all services to be discovered, mimicing the behavior of pre-0.6.0 -releases, by configuring this ACL rule for the `anonymous` token: - -``` -service "" { - policy = "read" -} -``` - -Note that the above will allow access for reading service information only. This -level of access allows discovering other services in the system, but is not -enough to allow the agent to sync its services and checks into the global -catalog during [anti-entropy](/docs/internals/anti-entropy.html). - -The most secure way of handling service registration and discovery is to run -Consul 0.6+ and issue tokens with explicit access for the services or service -prefixes which are expected to run on each agent. - -### Blacklist mode and Events - -Similar to the above, if your -[`acl_default_policy`](/docs/agent/options.html#acl_default_policy) is set to -`deny`, the `anonymous` token will have no access to allow firing user events. -This deviates from pre-0.6.0 builds, where user events were completely -unrestricted. - -Events have their own first-class expression in the ACL syntax. To restore -access to user events from arbitrary agents, configure an ACL rule like the -following for the `anonymous` token: - -``` -event "" { - policy = "write" -} -``` - -As always, the more secure way to handle user events is to explicitly grant -access to each API token based on the events they should be able to fire. - -### Blacklist mode and Prepared Queries - -After Consul 0.6.3, significant changes were made to ACLs for prepared queries, -including a new `query` ACL policy. See [Prepared Query ACLs](#prepared_query_acls) below for more details. - -### Blacklist mode and Keyring Operations - -Consul 0.6 and later supports securing the encryption keyring operations using -ACL's. Encryption is an optional component of the gossip layer. More information -about Consul's keyring operations can be found on the [keyring -command](/docs/commands/keyring.html) documentation page. - -If your [`acl_default_policy`](/docs/agent/options.html#acl_default_policy) is -set to `deny`, then the `anonymous` token will not have access to read or write -to the encryption keyring. The keyring policy is yet another first-class citizen -in the ACL syntax. You can configure the anonymous token to have free reign over -the keyring using a policy like the following: - -``` -keyring = "write" -``` - -Encryption keyring operations are sensitive and should be properly secured. It -is recommended that instead of configuring a wide-open policy like above, a -per-token policy is applied to maximize security. - -### Bootstrapping ACLs +#### Bootstrapping ACLs Bootstrapping the ACL system is done by providing an initial [`acl_master_token` configuration](/docs/agent/options.html#acl_master_token) which will be created @@ -187,8 +131,7 @@ for all servers. Once this is done, restart the current leader to force a leader ## Rule Specification A core part of the ACL system is a rule language which is used to describe the policy -that must be enforced. Consul supports ACLs for both [K/Vs](/intro/getting-started/kv.html) -and [services](/intro/getting-started/services.html). +that must be enforced. Key policies are defined by coupling a prefix with a policy. The rules are enforced using a longest-prefix match policy: Consul picks the most specific policy possible. The @@ -309,7 +252,108 @@ This is equivalent to the following JSON input: } ``` -## Services and Checks with ACLs +## Building ACL Policies + +#### Blacklist mode and `consul exec` + +If you set [`acl_default_policy`](/docs/agent/options.html#acl_default_policy) +to `deny`, the `anonymous` token won't have permission to read the default +`_rexec` prefix; therefore, Consul agents using the `anonymous` token +won't be able to perform [`consul exec`](/docs/commands/exec.html) actions. + +Here's why: the agents need read/write permission to the `_rexec` prefix for +[`consul exec`](/docs/commands/exec.html) to work properly. They use that prefix +as the transport for most data. + +You can enable [`consul exec`](/docs/commands/exec.html) from agents that are not +configured with a token by allowing the `anonymous` token to access that prefix. +This can be done by giving this rule to the `anonymous` token: + +```javascript +key "_rexec/" { + policy = "write" +} +``` + +Alternatively, you can, of course, add an explicit +[`acl_token`](/docs/agent/options.html#acl_token) to each agent, giving it access +to that prefix. + +#### Blacklist mode and Service Discovery + +If your [`acl_default_policy`](/docs/agent/options.html#acl_default_policy) is +set to `deny`, the `anonymous` token will be unable to read any service +information. This will cause the service discovery mechanisms in the REST API +and the DNS interface to return no results for any service queries. This is +because internally the API's and DNS interface consume the RPC interface, which +will filter results for services the token has no access to. + +You can allow all services to be discovered, mimicing the behavior of pre-0.6.0 +releases, by configuring this ACL rule for the `anonymous` token: + +``` +service "" { + policy = "read" +} +``` + +Note that the above will allow access for reading service information only. This +level of access allows discovering other services in the system, but is not +enough to allow the agent to sync its services and checks into the global +catalog during [anti-entropy](/docs/internals/anti-entropy.html). + +The most secure way of handling service registration and discovery is to run +Consul 0.6+ and issue tokens with explicit access for the services or service +prefixes which are expected to run on each agent. + +#### Blacklist mode and Events + +Similar to the above, if your +[`acl_default_policy`](/docs/agent/options.html#acl_default_policy) is set to +`deny`, the `anonymous` token will have no access to allow firing user events. +This deviates from pre-0.6.0 builds, where user events were completely +unrestricted. + +Events have their own first-class expression in the ACL syntax. To restore +access to user events from arbitrary agents, configure an ACL rule like the +following for the `anonymous` token: + +``` +event "" { + policy = "write" +} +``` + +As always, the more secure way to handle user events is to explicitly grant +access to each API token based on the events they should be able to fire. + +#### Blacklist mode and Prepared Queries + +After Consul 0.6.3, significant changes were made to ACLs for prepared queries, +including a new `query` ACL policy. See [Prepared Query ACLs](#prepared_query_acls) below for more details. + +#### Blacklist mode and Keyring Operations + +Consul 0.6 and later supports securing the encryption keyring operations using +ACL's. Encryption is an optional component of the gossip layer. More information +about Consul's keyring operations can be found on the [keyring +command](/docs/commands/keyring.html) documentation page. + +If your [`acl_default_policy`](/docs/agent/options.html#acl_default_policy) is +set to `deny`, then the `anonymous` token will not have access to read or write +to the encryption keyring. The keyring policy is yet another first-class citizen +in the ACL syntax. You can configure the anonymous token to have free reign over +the keyring using a policy like the following: + +``` +keyring = "write" +``` + +Encryption keyring operations are sensitive and should be properly secured. It +is recommended that instead of configuring a wide-open policy like above, a +per-token policy is applied to maximize security. + +#### Services and Checks with ACLs Consul allows configuring ACL policies which may control access to service and check registration. In order to successfully register a service or check with @@ -330,7 +374,7 @@ methods of configuring ACL tokens to use for registration events: [HTTP API](/docs/agent/http.html) for operations that require them. -## Restricting service discovery with ACLs +#### Restricting service discovery with ACLs In Consul 0.6, the ACL system was extended to support restricting read access to service registrations. This allows tighter access control and limits the ability @@ -413,7 +457,7 @@ Capturing ACL Tokens is analogous to Token is similar to the complementary `SECURITY INVOKER` attribute. -#### ACL Implementation Changes +#### ACL Implementation Changes for Prepared Queries Prepared queries were originally introduced in Consul 0.6.0, and ACL behavior remained unchanged through version 0.6.3, but was then changed to allow better management of the From 04fc5c8a452561afc2d097193d1158a719386166 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 9 Aug 2016 00:11:00 -0700 Subject: [PATCH 11/18] Moves ACL ID generation down into the endpoint. We don't want ACL replication to have this behavior so it was a little dangerous to have in the shared helper function. --- consul/acl_endpoint.go | 60 ++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/consul/acl_endpoint.go b/consul/acl_endpoint.go index 52cb5d228f..4f90410da3 100644 --- a/consul/acl_endpoint.go +++ b/consul/acl_endpoint.go @@ -19,9 +19,13 @@ type ACL struct { // this is a valid operation. It is used when users are updating ACLs, in which // case we check their token to make sure they have management privileges. It is // also used for ACL replication. We want to run the replicated ACLs through the -// same checks on the change itself. If an operation needs to generate an ID, -// routine will fill in an ID with the args as part of the request. +// same checks on the change itself. func aclApplyInternal(srv *Server, args *structs.ACLRequest, reply *string) error { + // All ACLs must have an ID by this point. + if args.ACL.ID == "" { + return fmt.Errorf("Missing ACL ID") + } + switch args.Op { case structs.ACLSet: // Verify the ACL type @@ -43,33 +47,8 @@ func aclApplyInternal(srv *Server, args *structs.ACLRequest, reply *string) erro return fmt.Errorf("ACL rule compilation failed: %v", err) } - // If no ID is provided, generate a new ID. This must - // be done prior to appending to the raft log, because the ID is not - // deterministic. Once the entry is in the log, the state update MUST - // be deterministic or the followers will not converge. - if args.ACL.ID == "" { - state := srv.fsm.State() - for { - if args.ACL.ID, err = uuid.GenerateUUID(); err != nil { - srv.logger.Printf("[ERR] consul.acl: UUID generation failed: %v", err) - return err - } - - _, acl, err := state.ACLGet(args.ACL.ID) - if err != nil { - srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err) - return err - } - if acl == nil { - break - } - } - } - case structs.ACLDelete: - if args.ACL.ID == "" { - return fmt.Errorf("Missing ACL ID") - } else if args.ACL.ID == anonymousToken { + if args.ACL.ID == anonymousToken { return fmt.Errorf("%s: Cannot delete anonymous token", permissionDenied) } @@ -115,6 +94,31 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { return permissionDeniedErr } + // If no ID is provided, generate a new ID. This must be done prior to + // appending to the Raft log, because the ID is not deterministic. Once + // the entry is in the log, the state update MUST be deterministic or + // the followers will not converge. + if args.Op == structs.ACLSet && args.ACL.ID == "" { + state := a.srv.fsm.State() + for { + var err error + args.ACL.ID, err = uuid.GenerateUUID() + if err != nil { + a.srv.logger.Printf("[ERR] consul.acl: UUID generation failed: %v", err) + return err + } + + _, acl, err := state.ACLGet(args.ACL.ID) + if err != nil { + a.srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err) + return err + } + if acl == nil { + break + } + } + } + // Do the apply now that this update is vetted. if err := aclApplyInternal(a.srv, args, reply); err != nil { return err From ae1cd5b47d8e5db29702d1d7f3da1dd74e59cc83 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 9 Aug 2016 11:00:22 -0700 Subject: [PATCH 12/18] Switches all ACL caches to 2Q. --- acl/cache.go | 25 +++++++++++++++++++------ acl/cache_test.go | 44 +++++++++++++++++++++++++++++++++++++------- consul/acl.go | 10 +++++----- 3 files changed, 61 insertions(+), 18 deletions(-) diff --git a/acl/cache.go b/acl/cache.go index 0debd1d20f..0387f9fbe9 100644 --- a/acl/cache.go +++ b/acl/cache.go @@ -21,9 +21,9 @@ type aclEntry struct { // Cache is used to implement policy and ACL caching type Cache struct { faultfn FaultFunc - aclCache *lru.Cache // Cache id -> acl - policyCache *lru.Cache // Cache policy -> acl - ruleCache *lru.Cache // Cache rules -> policy + aclCache *lru.TwoQueueCache // Cache id -> acl + policyCache *lru.TwoQueueCache // Cache policy -> acl + ruleCache *lru.TwoQueueCache // Cache rules -> policy } // NewCache constructs a new policy and ACL cache of a given size @@ -31,9 +31,22 @@ func NewCache(size int, faultfn FaultFunc) (*Cache, error) { if size <= 0 { return nil, fmt.Errorf("Must provide positive cache size") } - rc, _ := lru.New(size) - pc, _ := lru.New(size) - ac, _ := lru.New(size) + + rc, err := lru.New2Q(size) + if err != nil { + return nil, err + } + + pc, err := lru.New2Q(size) + if err != nil { + return nil, err + } + + ac, err := lru.New2Q(size) + if err != nil { + return nil, err + } + c := &Cache{ faultfn: faultfn, aclCache: ac, diff --git a/acl/cache_test.go b/acl/cache_test.go index f880bcaf4e..d144b0009d 100644 --- a/acl/cache_test.go +++ b/acl/cache_test.go @@ -5,7 +5,7 @@ import ( ) func TestCache_GetPolicy(t *testing.T) { - c, err := NewCache(1, nil) + c, err := NewCache(2, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -24,11 +24,23 @@ func TestCache_GetPolicy(t *testing.T) { t.Fatalf("should be cached") } - // Cache a new policy + // Work with some new policies to evict the original one _, err = c.GetPolicy(testSimplePolicy) if err != nil { t.Fatalf("err: %v", err) } + _, err = c.GetPolicy(testSimplePolicy) + if err != nil { + t.Fatalf("err: %v", err) + } + _, err = c.GetPolicy(testSimplePolicy2) + if err != nil { + t.Fatalf("err: %v", err) + } + _, err = c.GetPolicy(testSimplePolicy2) + if err != nil { + t.Fatalf("err: %v", err) + } // Test invalidation of p p3, err := c.GetPolicy("") @@ -44,12 +56,13 @@ func TestCache_GetACL(t *testing.T) { policies := map[string]string{ "foo": testSimplePolicy, "bar": testSimplePolicy2, + "baz": testSimplePolicy3, } faultfn := func(id string) (string, string, error) { return "deny", policies[id], nil } - c, err := NewCache(1, faultfn) + c, err := NewCache(2, faultfn) if err != nil { t.Fatalf("err: %v", err) } @@ -80,6 +93,18 @@ func TestCache_GetACL(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } + _, err = c.GetACL("bar") + if err != nil { + t.Fatalf("err: %v", err) + } + _, err = c.GetACL("baz") + if err != nil { + t.Fatalf("err: %v", err) + } + _, err = c.GetACL("baz") + if err != nil { + t.Fatalf("err: %v", err) + } acl3, err := c.GetACL("foo") if err != nil { @@ -100,7 +125,7 @@ func TestCache_ClearACL(t *testing.T) { return "deny", policies[id], nil } - c, err := NewCache(1, faultfn) + c, err := NewCache(16, faultfn) if err != nil { t.Fatalf("err: %v", err) } @@ -135,7 +160,7 @@ func TestCache_Purge(t *testing.T) { return "deny", policies[id], nil } - c, err := NewCache(1, faultfn) + c, err := NewCache(16, faultfn) if err != nil { t.Fatalf("err: %v", err) } @@ -167,7 +192,7 @@ func TestCache_GetACLPolicy(t *testing.T) { faultfn := func(id string) (string, string, error) { return "deny", policies[id], nil } - c, err := NewCache(1, faultfn) + c, err := NewCache(16, faultfn) if err != nil { t.Fatalf("err: %v", err) } @@ -220,7 +245,7 @@ func TestCache_GetACL_Parent(t *testing.T) { return "", "", nil } - c, err := NewCache(1, faultfn) + c, err := NewCache(16, faultfn) if err != nil { t.Fatalf("err: %v", err) } @@ -296,3 +321,8 @@ key "bar/" { policy = "read" } ` +var testSimplePolicy3 = ` +key "baz/" { + policy = "read" +} +` diff --git a/consul/acl.go b/consul/acl.go index df8846698f..278395f84e 100644 --- a/consul/acl.go +++ b/consul/acl.go @@ -112,16 +112,16 @@ func (s *Server) resolveToken(id string) (acl.ACL, error) { // rpcFn is used to make an RPC call to the client or server. type rpcFn func(string, interface{}, interface{}) error -// aclCache is used to cache ACL's and policies. +// aclCache is used to cache ACLs and policies. type aclCache struct { config *Config logger *log.Logger // acls is a non-authoritative ACL cache. - acls *lru.Cache + acls *lru.TwoQueueCache // aclPolicyCache is a non-authoritative policy cache. - policies *lru.Cache + policies *lru.TwoQueueCache // rpc is a function used to talk to the client/server. rpc rpcFn @@ -144,13 +144,13 @@ func newAclCache(conf *Config, logger *log.Logger, rpc rpcFn, local acl.FaultFun } // Initialize the non-authoritative ACL cache - cache.acls, err = lru.New(aclCacheSize) + cache.acls, err = lru.New2Q(aclCacheSize) if err != nil { return nil, fmt.Errorf("Failed to create ACL cache: %v", err) } // Initialize the ACL policy cache - cache.policies, err = lru.New(aclCacheSize) + cache.policies, err = lru.New2Q(aclCacheSize) if err != nil { return nil, fmt.Errorf("Failed to create ACL policy cache: %v", err) } From 17537a0f108b1b9215cf522f8132321ab09b711c Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 9 Aug 2016 11:08:26 -0700 Subject: [PATCH 13/18] Moves ACL ID sorting interface onto the iterator. --- consul/acl_replication.go | 48 ++++++++++++++++------------------ consul/acl_replication_test.go | 2 +- 2 files changed, 23 insertions(+), 27 deletions(-) diff --git a/consul/acl_replication.go b/consul/acl_replication.go index c7a5f3dd13..c59077543e 100644 --- a/consul/acl_replication.go +++ b/consul/acl_replication.go @@ -10,30 +10,11 @@ import ( "github.com/hashicorp/consul/lib" ) -// aclIDSorter is used to make sure a given list of ACLs is sorted by token ID. -// This should always be true, but since this is crucial for correctness and we -// are accepting input from another server, we sort to make sure. -type aclIDSorter struct { - acls structs.ACLs -} - -// See sort.Interface. -func (a *aclIDSorter) Len() int { - return len(a.acls) -} - -// See sort.Interface. -func (a *aclIDSorter) Swap(i, j int) { - a.acls[i], a.acls[j] = a.acls[j], a.acls[i] -} - -// See sort.Interface. -func (a *aclIDSorter) Less(i, j int) bool { - return a.acls[i].ID < a.acls[j].ID -} - // aclIterator simplifies the algorithm below by providing a basic iterator that -// moves through a list of ACLs and returns nil when it's exhausted. +// moves through a list of ACLs and returns nil when it's exhausted. It also has +// methods for pre-sorting the ACLs being iterated over by ID, which should +// already be true, but since this is crucial for correctness and we are taking +// input from other servers, we sort to make sure. type aclIterator struct { acls structs.ACLs @@ -46,6 +27,21 @@ func newACLIterator(acls structs.ACLs) *aclIterator { return &aclIterator{acls: acls} } +// See sort.Interface. +func (a *aclIterator) Len() int { + return len(a.acls) +} + +// See sort.Interface. +func (a *aclIterator) Swap(i, j int) { + a.acls[i], a.acls[j] = a.acls[j], a.acls[i] +} + +// See sort.Interface. +func (a *aclIterator) Less(i, j int) bool { + return a.acls[i].ID < a.acls[j].ID +} + // Front returns the item at index position, or nil if the list is exhausted. func (a *aclIterator) Front() *structs.ACL { if a.index < len(a.acls) { @@ -72,12 +68,12 @@ func reconcileACLs(local, remote structs.ACLs, lastRemoteIndex uint64) structs.A // version of Consul, and sorted-ness is kind of a subtle property of // the state store indexing, it's prudent to make sure things are sorted // before we begin. - sort.Sort(&aclIDSorter{local}) - sort.Sort(&aclIDSorter{remote}) + localIter, remoteIter := newACLIterator(local), newACLIterator(remote) + sort.Sort(localIter) + sort.Sort(remoteIter) // Run through both lists and reconcile them. var changes structs.ACLRequests - localIter, remoteIter := newACLIterator(local), newACLIterator(remote) for localIter.Front() != nil || remoteIter.Front() != nil { // If the local list is exhausted, then process this as a remote // add. We know from the loop condition that there's something diff --git a/consul/acl_replication_test.go b/consul/acl_replication_test.go index 0f60fc69a8..6bb0bb9602 100644 --- a/consul/acl_replication_test.go +++ b/consul/acl_replication_test.go @@ -21,7 +21,7 @@ func TestACLReplication_Sorter(t *testing.T) { &structs.ACL{ID: "c"}, } - sorter := &aclIDSorter{acls} + sorter := &aclIterator{acls, 0} if len := sorter.Len(); len != 3 { t.Fatalf("bad: %d", len) } From a771b34de632b0c53fc12eae53b76d1790a7308e Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 9 Aug 2016 11:09:48 -0700 Subject: [PATCH 14/18] Returns from the shutdown wait right away. --- consul/acl_replication.go | 1 + 1 file changed, 1 insertion(+) diff --git a/consul/acl_replication.go b/consul/acl_replication.go index c59077543e..94642ebcf6 100644 --- a/consul/acl_replication.go +++ b/consul/acl_replication.go @@ -284,6 +284,7 @@ func (s *Server) runACLReplication() { select { case <-time.After(lib.RandomStagger(s.config.ACLReplicationInterval)): case <-s.shutdownCh: + return } // We are fairly conservative with the lastRemoteIndex so that after a From 5efd35c590c996d1e626de0e39152321c5dbd5be Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 9 Aug 2016 11:10:32 -0700 Subject: [PATCH 15/18] Clarifies replication index shown in the log message. --- consul/acl_replication.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consul/acl_replication.go b/consul/acl_replication.go index 94642ebcf6..2a74d9b182 100644 --- a/consul/acl_replication.go +++ b/consul/acl_replication.go @@ -311,7 +311,7 @@ func (s *Server) runACLReplication() { status.ReplicatedIndex = index status.LastSuccess = time.Now() s.updateACLReplicationStatus(status) - s.logger.Printf("[DEBUG] consul: ACL replication completed through index %d", index) + s.logger.Printf("[DEBUG] consul: ACL replication completed through remote index %d", index) } } pause := func() { From 11ad551204df2a14896f9b95c76d65ace9668179 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 9 Aug 2016 11:29:12 -0700 Subject: [PATCH 16/18] Switches to a smooth rate limit vs. a bursty one. --- consul/acl_replication.go | 21 ++++++++------------- consul/acl_replication_test.go | 8 ++++---- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/consul/acl_replication.go b/consul/acl_replication.go index 2a74d9b182..573b562513 100644 --- a/consul/acl_replication.go +++ b/consul/acl_replication.go @@ -168,20 +168,8 @@ func (s *Server) fetchRemoteACLs(lastRemoteIndex uint64) (*structs.IndexedACLs, // UpdateLocalACLs is given a list of changes to apply in order to bring the // local ACLs in-line with the remote ACLs from the ACL datacenter. func (s *Server) updateLocalACLs(changes structs.ACLRequests) error { - var ops int - start := time.Now() + minTimePerOp := time.Second / time.Duration(s.config.ACLReplicationApplyLimit) for _, change := range changes { - // Do a very simple rate limit algorithm where we check every N - // operations and wait out to the second before we continue. If - // it's going slower than that, the sleep time will be negative - // so we will just keep going without delay. - if ops >= s.config.ACLReplicationApplyLimit { - elapsed := time.Now().Sub(start) - time.Sleep(1*time.Second - elapsed) - ops, start = 0, time.Now() - } - ops++ - // Note that we are using the single ACL interface here and not // performing all this inside a single transaction. This is OK // for two reasons. First, there's nothing else other than this @@ -191,9 +179,16 @@ func (s *Server) updateLocalACLs(changes structs.ACLRequests) error { // next replication pass will clean up and check everything // again. var reply string + start := time.Now() if err := aclApplyInternal(s, change, &reply); err != nil { return err } + + // Do a smooth rate limit to wait out the min time allowed for + // each op. If this op took longer than the min, then the sleep + // time will be negative and we will just move on. + elapsed := time.Now().Sub(start) + time.Sleep(minTimePerOp - elapsed) } return nil } diff --git a/consul/acl_replication_test.go b/consul/acl_replication_test.go index 6bb0bb9602..f7f10752b8 100644 --- a/consul/acl_replication_test.go +++ b/consul/acl_replication_test.go @@ -240,12 +240,12 @@ func TestACLReplication_updateLocalACLs_RateLimit(t *testing.T) { }, } - // Under the limit, should be quick. + // Should be throttled to 1 Hz. start := time.Now() if err := s1.updateLocalACLs(changes); err != nil { t.Fatalf("err: %v", err) } - if dur := time.Now().Sub(start); dur > 500*time.Millisecond { + if dur := time.Now().Sub(start); dur < time.Second { t.Fatalf("too slow: %9.6f", dur.Seconds()) } @@ -258,12 +258,12 @@ func TestACLReplication_updateLocalACLs_RateLimit(t *testing.T) { }, }) - // Over the limit, should be throttled. + // Should be throttled to 1 Hz. start = time.Now() if err := s1.updateLocalACLs(changes); err != nil { t.Fatalf("err: %v", err) } - if dur := time.Now().Sub(start); dur < 500*time.Millisecond { + if dur := time.Now().Sub(start); dur < 2*time.Second { t.Fatalf("too fast: %9.6f", dur.Seconds()) } } From e07298594eb2a6295ac3f9c32c90c7361babca9a Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 9 Aug 2016 11:32:12 -0700 Subject: [PATCH 17/18] Adds I/O-sensitive metrics to ACL replication operations. --- consul/acl_replication.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/consul/acl_replication.go b/consul/acl_replication.go index 573b562513..3b767edb0b 100644 --- a/consul/acl_replication.go +++ b/consul/acl_replication.go @@ -150,6 +150,8 @@ func (s *Server) fetchLocalACLs() (structs.ACLs, error) { // datacenter. The lastIndex parameter is a hint about which remote index we // have replicated to, so this is expected to block until something changes. func (s *Server) fetchRemoteACLs(lastRemoteIndex uint64) (*structs.IndexedACLs, error) { + defer metrics.MeasureSince([]string{"consul", "leader", "fetchRemoteACLs"}, time.Now()) + args := structs.DCSpecificRequest{ Datacenter: s.config.ACLDatacenter, QueryOptions: structs.QueryOptions{ @@ -168,6 +170,8 @@ func (s *Server) fetchRemoteACLs(lastRemoteIndex uint64) (*structs.IndexedACLs, // UpdateLocalACLs is given a list of changes to apply in order to bring the // local ACLs in-line with the remote ACLs from the ACL datacenter. func (s *Server) updateLocalACLs(changes structs.ACLRequests) error { + defer metrics.MeasureSince([]string{"consul", "leader", "updateLocalACLs"}, time.Now()) + minTimePerOp := time.Second / time.Duration(s.config.ACLReplicationApplyLimit) for _, change := range changes { // Note that we are using the single ACL interface here and not From 12ad26e0fc55e5a06db3d297078009a92d2b6910 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 9 Aug 2016 11:33:42 -0700 Subject: [PATCH 18/18] Tweaks select style. --- consul/acl_replication.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/consul/acl_replication.go b/consul/acl_replication.go index 3b767edb0b..57ffce2555 100644 --- a/consul/acl_replication.go +++ b/consul/acl_replication.go @@ -281,9 +281,10 @@ func (s *Server) runACLReplication() { // Give each server's replicator a random initial phase for good // measure. select { - case <-time.After(lib.RandomStagger(s.config.ACLReplicationInterval)): case <-s.shutdownCh: return + + case <-time.After(lib.RandomStagger(s.config.ACLReplicationInterval)): } // We are fairly conservative with the lastRemoteIndex so that after a