From 0f4e24f72c98d56bac1fe9d94dfc3dc7f98187c9 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 29 Aug 2017 22:35:22 -0500 Subject: [PATCH] Consolidate server lookup into one place and replace usages of localConsuls. --- agent/consul/rpc.go | 4 +- agent/consul/serf.go | 20 +++------ agent/consul/server.go | 18 +++----- agent/consul/server_address_lookup.go | 48 ++++++++++++++++------ agent/consul/server_address_lookup_test.go | 42 +++++++++++++++---- agent/consul/server_test.go | 6 +-- 6 files changed, 84 insertions(+), 54 deletions(-) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 41a1a6699a..baf0c4c61f 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -238,9 +238,7 @@ func (s *Server) getLeader() (bool, *metadata.Server) { } // Lookup the server - s.localLock.RLock() - server := s.localConsuls[leader] - s.localLock.RUnlock() + server, _ := s.serverLookup.GetServer(leader) // Server could be nil return false, server diff --git a/agent/consul/serf.go b/agent/consul/serf.go index d1e3d9012e..ac5f30b8ea 100644 --- a/agent/consul/serf.go +++ b/agent/consul/serf.go @@ -125,18 +125,14 @@ func (s *Server) localEvent(event serf.UserEvent) { // lanNodeJoin is used to handle join events on the LAN pool. func (s *Server) lanNodeJoin(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := metadata.IsConsulServer(m) + ok, serverMeta := metadata.IsConsulServer(m) if !ok { continue } - s.logger.Printf("[INFO] consul: Adding LAN server %s", parts) + s.logger.Printf("[INFO] consul: Adding LAN server %s", serverMeta) - // See if it's configured as part of our DC. - if parts.Datacenter == s.config.Datacenter { - s.localLock.Lock() - s.localConsuls[raft.ServerAddress(parts.Addr.String())] = parts - s.localLock.Unlock() - } + // Update server lookup + s.serverLookup.AddServer(serverMeta) // If we're still expecting to bootstrap, may need to handle this. if s.config.BootstrapExpect != 0 { @@ -144,7 +140,7 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) { } // Update id to address map - s.serverAddressLookup.AddServer(parts.ID, parts.Addr.String()) + s.serverLookup.AddServer(serverMeta) // Kick the join flooders. s.FloodNotify() @@ -274,11 +270,7 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) { } s.logger.Printf("[INFO] consul: Removing LAN server %s", parts) - s.localLock.Lock() - delete(s.localConsuls, raft.ServerAddress(parts.Addr.String())) - s.localLock.Unlock() - // Update id to address map - s.serverAddressLookup.RemoveServer(parts.ID) + s.serverLookup.RemoveServer(parts) } } diff --git a/agent/consul/server.go b/agent/consul/server.go index 60f4bd79a3..5c7ec63cef 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -123,11 +123,6 @@ type Server struct { // strong consistency. fsm *consulFSM - // localConsuls is used to track the known consuls - // in the local datacenter. Used to do leader forwarding. - localConsuls map[raft.ServerAddress]*metadata.Server - localLock sync.RWMutex - // Logger uses the provided LogOutput logger *log.Logger @@ -171,8 +166,8 @@ type Server struct { // which SHOULD only consist of Consul servers serfWAN *serf.Serf - // fast lookup from id to server address to provide to the raft transport layer - serverAddressLookup *ServerAddressLookup + // serverLookup provides fast and thread-safe lookup by id and address + serverLookup *ServerLookup // floodLock controls access to floodCh. floodLock sync.RWMutex @@ -298,7 +293,6 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* connPool: connPool, eventChLAN: make(chan serf.Event, 256), eventChWAN: make(chan serf.Event, 256), - localConsuls: make(map[raft.ServerAddress]*metadata.Server), logger: logger, reconcileCh: make(chan serf.Member, 32), router: router.NewRouter(logger, config.Datacenter), @@ -307,7 +301,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* reassertLeaderCh: make(chan chan error), sessionTimers: NewSessionTimers(), tombstoneGC: gc, - serverAddressLookup: NewServerAddressLookup(), + serverLookup: NewServerLookup(), shutdownCh: shutdownCh, } @@ -502,7 +496,7 @@ func (s *Server) setupRaft() error { Stream: s.raftLayer, MaxPool: 3, Timeout: 10 * time.Second, - ServerAddressProvider: s.serverAddressLookup, + ServerAddressProvider: s.serverLookup, } trans := raft.NewNetworkTransportWithConfig(transConfig) @@ -705,9 +699,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { return true } - s.localLock.RLock() - server, ok := s.localConsuls[address] - s.localLock.RUnlock() + server, ok := s.serverLookup.GetServer(address) if !ok { return false diff --git a/agent/consul/server_address_lookup.go b/agent/consul/server_address_lookup.go index 82d93a38b1..ac1d33937e 100644 --- a/agent/consul/server_address_lookup.go +++ b/agent/consul/server_address_lookup.go @@ -4,31 +4,53 @@ import ( "fmt" "sync" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/raft" ) -// serverIdToAddress is a map from id to address for servers in the LAN pool. -// used for fast lookup to satisfy the ServerAddressProvider interface -type ServerAddressLookup struct { - serverIdToAddress sync.Map +// ServerLookup encapsulates looking up servers by id and address +type ServerLookup struct { + lock sync.RWMutex + addressToServer map[raft.ServerAddress]*metadata.Server + IdToServer map[raft.ServerID]*metadata.Server } -func NewServerAddressLookup() *ServerAddressLookup { - return &ServerAddressLookup{} +func NewServerLookup() *ServerLookup { + return &ServerLookup{addressToServer: make(map[raft.ServerAddress]*metadata.Server), IdToServer: make(map[raft.ServerID]*metadata.Server)} } -func (sa *ServerAddressLookup) AddServer(id string, address string) { - sa.serverIdToAddress.Store(id, address) +func (sa *ServerLookup) AddServer(server *metadata.Server) { + sa.lock.Lock() + defer sa.lock.Unlock() + sa.addressToServer[raft.ServerAddress(server.Addr.String())] = server + sa.IdToServer[raft.ServerID(server.ID)] = server } -func (sa *ServerAddressLookup) RemoveServer(id string) { - sa.serverIdToAddress.Delete(id) +func (sa *ServerLookup) RemoveServer(server *metadata.Server) { + sa.lock.Lock() + defer sa.lock.Unlock() + delete(sa.addressToServer, raft.ServerAddress(server.Addr.String())) + delete(sa.IdToServer, raft.ServerID(server.ID)) } -func (sa *ServerAddressLookup) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) { - val, ok := sa.serverIdToAddress.Load(string(id)) +// Implements the ServerAddressProvider interface +func (sa *ServerLookup) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) { + sa.lock.RLock() + defer sa.lock.RUnlock() + svr, ok := sa.IdToServer[id] if !ok { return "", fmt.Errorf("Could not find address for server id %v", id) } - return raft.ServerAddress(val.(string)), nil + return raft.ServerAddress(svr.Addr.String()), nil +} + +// GetServer looks up the server by address, returns a boolean if not found +func (sa *ServerLookup) GetServer(addr raft.ServerAddress) (*metadata.Server, bool) { + sa.lock.RLock() + defer sa.lock.RUnlock() + svr, ok := sa.addressToServer[addr] + if !ok { + return nil, false + } + return svr, true } diff --git a/agent/consul/server_address_lookup_test.go b/agent/consul/server_address_lookup_test.go index e76725b564..d1189bb6e1 100644 --- a/agent/consul/server_address_lookup_test.go +++ b/agent/consul/server_address_lookup_test.go @@ -3,14 +3,32 @@ package consul import ( "fmt" "testing" + + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/raft" ) -func TestServerAddressLookup(t *testing.T) { - lookup := NewServerAddressLookup() - addr := "72.0.0.17:8300" - lookup.AddServer("1", addr) +type testAddr struct { + addr string +} - got, err := lookup.ServerAddr("1") +func (ta *testAddr) Network() string { + return "tcp" +} + +func (ta *testAddr) String() string { + return ta.addr +} + +func TestServerLookup(t *testing.T) { + lookup := NewServerLookup() + addr := "72.0.0.17:8300" + id := "1" + + svr := &metadata.Server{ID: id, Addr: &testAddr{addr}} + lookup.AddServer(svr) + + got, err := lookup.ServerAddr(raft.ServerID(id)) if err != nil { t.Fatalf("Unexpected error:%v", err) } @@ -18,7 +36,15 @@ func TestServerAddressLookup(t *testing.T) { t.Fatalf("Expected %v but got %v", addr, got) } - lookup.RemoveServer("1") + server, ok := lookup.GetServer(raft.ServerAddress(addr)) + if !ok { + t.Fatalf("Expected lookup to return true") + } + if server.Addr.String() != addr { + t.Fatalf("Expected lookup to return address %v but got %v", addr, server.Addr) + } + + lookup.RemoveServer(svr) got, err = lookup.ServerAddr("1") expectedErr := fmt.Errorf("Could not find address for server id 1") @@ -26,5 +52,7 @@ func TestServerAddressLookup(t *testing.T) { t.Fatalf("Unexpected error, got %v wanted %v", err, expectedErr) } - lookup.RemoveServer("3") + svr2 := &metadata.Server{ID: "2", Addr: &testAddr{"123.4.5.6"}} + lookup.RemoveServer(svr2) + } diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 21dfe259f2..0d921e1268 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -342,7 +342,7 @@ func TestServer_JoinSeparateLanAndWanAddresses(t *testing.T) { if len(s2.router.GetDatacenters()) != 2 { r.Fatalf("remote consul missing") } - if len(s2.localConsuls) != 2 { + if len(s2.serverLookup.addressToServer) != 2 { r.Fatalf("local consul fellow s3 for s2 missing") } }) @@ -666,14 +666,12 @@ func testVerifyRPC(s1, s2 *Server, t *testing.T) (bool, error) { retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s2, 2)) }) // Have s2 make an RPC call to s1 - s2.localLock.RLock() var leader *metadata.Server - for _, server := range s2.localConsuls { + for _, server := range s2.serverLookup.addressToServer { if server.Name == s1.config.NodeName { leader = server } } - s2.localLock.RUnlock() if leader == nil { t.Fatal("no leader") }