diff --git a/agent/agent.go b/agent/agent.go index abce307c35..8c903a8580 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -703,16 +703,21 @@ func (a *Agent) consulConfig() (*consul.Config, error) { base.SerfLANConfig.MemberlistConfig.ProbeTimeout = a.config.ConsulSerfLANProbeTimeout base.SerfLANConfig.MemberlistConfig.SuspicionMult = a.config.ConsulSerfLANSuspicionMult - base.SerfWANConfig.MemberlistConfig.BindAddr = a.config.SerfBindAddrWAN.IP.String() - base.SerfWANConfig.MemberlistConfig.BindPort = a.config.SerfBindAddrWAN.Port - base.SerfWANConfig.MemberlistConfig.AdvertiseAddr = a.config.SerfAdvertiseAddrWAN.IP.String() - base.SerfWANConfig.MemberlistConfig.AdvertisePort = a.config.SerfAdvertiseAddrWAN.Port - base.SerfWANConfig.MemberlistConfig.GossipVerifyIncoming = a.config.EncryptVerifyIncoming - base.SerfWANConfig.MemberlistConfig.GossipVerifyOutgoing = a.config.EncryptVerifyOutgoing - base.SerfWANConfig.MemberlistConfig.GossipInterval = a.config.ConsulSerfWANGossipInterval - base.SerfWANConfig.MemberlistConfig.ProbeInterval = a.config.ConsulSerfWANProbeInterval - base.SerfWANConfig.MemberlistConfig.ProbeTimeout = a.config.ConsulSerfWANProbeTimeout - base.SerfWANConfig.MemberlistConfig.SuspicionMult = a.config.ConsulSerfWANSuspicionMult + if a.config.SerfBindAddrWAN != nil { + base.SerfWANConfig.MemberlistConfig.BindAddr = a.config.SerfBindAddrWAN.IP.String() + base.SerfWANConfig.MemberlistConfig.BindPort = a.config.SerfBindAddrWAN.Port + base.SerfWANConfig.MemberlistConfig.AdvertiseAddr = a.config.SerfAdvertiseAddrWAN.IP.String() + base.SerfWANConfig.MemberlistConfig.AdvertisePort = a.config.SerfAdvertiseAddrWAN.Port + base.SerfWANConfig.MemberlistConfig.GossipVerifyIncoming = a.config.EncryptVerifyIncoming + base.SerfWANConfig.MemberlistConfig.GossipVerifyOutgoing = a.config.EncryptVerifyOutgoing + base.SerfWANConfig.MemberlistConfig.GossipInterval = a.config.ConsulSerfWANGossipInterval + base.SerfWANConfig.MemberlistConfig.ProbeInterval = a.config.ConsulSerfWANProbeInterval + base.SerfWANConfig.MemberlistConfig.ProbeTimeout = a.config.ConsulSerfWANProbeTimeout + base.SerfWANConfig.MemberlistConfig.SuspicionMult = a.config.ConsulSerfWANSuspicionMult + } else { + // Disable serf WAN federation + base.SerfWANConfig = nil + } base.RPCAddr = a.config.RPCBindAddr base.RPCAdvertise = a.config.RPCAdvertiseAddr @@ -1019,6 +1024,7 @@ func (a *Agent) setupNodeID(config *config.RuntimeConfig) error { func (a *Agent) setupBaseKeyrings(config *consul.Config) error { // If the keyring file is disabled then just poke the provided key // into the in-memory keyring. + federationEnabled := config.SerfWANConfig != nil if a.config.DisableKeyringFile { if a.config.EncryptKey == "" { return nil @@ -1028,7 +1034,7 @@ func (a *Agent) setupBaseKeyrings(config *consul.Config) error { if err := loadKeyring(config.SerfLANConfig, keys); err != nil { return err } - if a.config.ServerMode { + if a.config.ServerMode && federationEnabled { if err := loadKeyring(config.SerfWANConfig, keys); err != nil { return err } @@ -1048,7 +1054,7 @@ func (a *Agent) setupBaseKeyrings(config *consul.Config) error { return err } } - if a.config.ServerMode { + if a.config.ServerMode && federationEnabled { if _, err := os.Stat(fileWAN); err != nil { if err := initKeyring(fileWAN, a.config.EncryptKey); err != nil { return err @@ -1063,7 +1069,7 @@ LOAD: if err := loadKeyringFile(config.SerfLANConfig); err != nil { return err } - if a.config.ServerMode { + if a.config.ServerMode && federationEnabled { if _, err := os.Stat(fileWAN); err == nil { config.SerfWANConfig.KeyringFile = fileWAN } diff --git a/agent/config/builder.go b/agent/config/builder.go index efcffde893..22e451bf5f 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -369,9 +369,6 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { if ipaddr.IsAny(b.stringVal(c.AdvertiseAddrWAN)) { return RuntimeConfig{}, fmt.Errorf("Advertise WAN address cannot be 0.0.0.0, :: or [::]") } - if serfPortWAN < 0 { - return RuntimeConfig{}, fmt.Errorf("ports.serf_wan must be a valid port from 1 to 65535") - } bindAddr := bindAddrs[0].(*net.IPAddr) advertiseAddr := b.makeIPAddr(b.expandFirstIP("advertise_addr", c.AdvertiseAddrLAN), bindAddr) @@ -411,14 +408,23 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { // derive other bind addresses from the bindAddr rpcBindAddr := b.makeTCPAddr(bindAddr, nil, serverPort) serfBindAddrLAN := b.makeTCPAddr(b.expandFirstIP("serf_lan", c.SerfBindAddrLAN), bindAddr, serfPortLAN) - serfBindAddrWAN := b.makeTCPAddr(b.expandFirstIP("serf_wan", c.SerfBindAddrWAN), bindAddr, serfPortWAN) + + // Only initialize serf WAN bind address when its enabled + var serfBindAddrWAN *net.TCPAddr + if serfPortWAN >= 0 { + serfBindAddrWAN = b.makeTCPAddr(b.expandFirstIP("serf_wan", c.SerfBindAddrWAN), bindAddr, serfPortWAN) + } // derive other advertise addresses from the advertise address advertiseAddrLAN := b.makeIPAddr(b.expandFirstIP("advertise_addr", c.AdvertiseAddrLAN), advertiseAddr) advertiseAddrWAN := b.makeIPAddr(b.expandFirstIP("advertise_addr_wan", c.AdvertiseAddrWAN), advertiseAddrLAN) rpcAdvertiseAddr := &net.TCPAddr{IP: advertiseAddrLAN.IP, Port: serverPort} serfAdvertiseAddrLAN := &net.TCPAddr{IP: advertiseAddrLAN.IP, Port: serfPortLAN} - serfAdvertiseAddrWAN := &net.TCPAddr{IP: advertiseAddrWAN.IP, Port: serfPortWAN} + // Only initialize serf WAN advertise address when its enabled + var serfAdvertiseAddrWAN *net.TCPAddr + if serfPortWAN >= 0 { + serfAdvertiseAddrWAN = &net.TCPAddr{IP: advertiseAddrWAN.IP, Port: serfPortWAN} + } // determine client addresses clientAddrs := b.expandIPs("client_addr", c.ClientAddr) @@ -869,8 +875,11 @@ func (b *Builder) Validate(rt RuntimeConfig) error { if err := addrUnique(inuse, "Serf Advertise LAN", rt.SerfAdvertiseAddrLAN); err != nil { return err } - if err := addrUnique(inuse, "Serf Advertise WAN", rt.SerfAdvertiseAddrWAN); err != nil { - return err + // Validate serf WAN advertise address only when its set + if rt.SerfAdvertiseAddrWAN != nil { + if err := addrUnique(inuse, "Serf Advertise WAN", rt.SerfAdvertiseAddrWAN); err != nil { + return err + } } if b.err != nil { return b.err diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 2f58b97813..633c9cf654 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -1065,7 +1065,7 @@ func TestConfigFlagsAndEdgecases(t *testing.T) { }, }, { - desc: "serf wan port > 0", + desc: "allow disabling serf wan port", args: []string{`-data-dir=` + dataDir}, json: []string{`{ "ports": { @@ -1079,7 +1079,17 @@ func TestConfigFlagsAndEdgecases(t *testing.T) { } advertise_addr_wan = "1.2.3.4" `}, - err: "ports.serf_wan must be a valid port from 1 to 65535", + patch: func(rt *RuntimeConfig) { + rt.AdvertiseAddrWAN = ipAddr("1.2.3.4") + rt.SerfAdvertiseAddrWAN = nil + rt.SerfBindAddrWAN = nil + rt.TaggedAddresses = map[string]string{ + "lan": "10.0.0.1", + "wan": "1.2.3.4", + } + rt.DataDir = dataDir + rt.SerfPortWAN = -1 + }, }, { desc: "serf bind address lan template", diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index ca92fde575..5cfb3ff513 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -170,6 +170,10 @@ func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error { return err } + if len(dcs) == 0 { // no WAN federation, so return the local data center name + dcs = []string{c.srv.config.Datacenter} + } + *reply = dcs return nil } diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 5dd77d88f4..55abeec55b 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -129,7 +129,7 @@ func (m *Internal) KeyringOperation( } // Only perform WAN keyring querying and RPC forwarding once - if !args.Forwarded { + if !args.Forwarded && m.srv.serfWAN != nil { args.Forwarded = true m.executeKeyringOp(args, reply, true) return m.srv.globalRPC("Internal.KeyringOperation", args, reply) diff --git a/agent/consul/server.go b/agent/consul/server.go index 831e0973af..128f67081c 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -32,7 +32,6 @@ import ( "github.com/hashicorp/consul/types" "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" - "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" ) @@ -75,6 +74,10 @@ const ( raftRemoveGracePeriod = 5 * time.Second ) +var ( + ErrWANFederationDisabled = fmt.Errorf("WAN Federation is disabled") +) + // Server is Consul server which manages the service discovery, // health checking, DC forwarding, Raft, and multiple Serf pools. type Server struct { @@ -344,21 +347,23 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* // created, so we can pull it out from there reliably, even though it's // a little gross to be reading the updated config. - // Initialize the WAN Serf. - serfBindPortWAN := config.SerfWANConfig.MemberlistConfig.BindPort - s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "", s.Listener) - if err != nil { - s.Shutdown() - return nil, fmt.Errorf("Failed to start WAN Serf: %v", err) - } - - // See big comment above why we are doing this. - if serfBindPortWAN == 0 { + // Initialize the WAN Serf if enabled + serfBindPortWAN := -1 + if config.SerfWANConfig != nil { serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort - if serfBindPortWAN == 0 { - return nil, fmt.Errorf("Failed to get dynamic bind port for WAN Serf") + s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "", s.Listener) + if err != nil { + s.Shutdown() + return nil, fmt.Errorf("Failed to start WAN Serf: %v", err) + } + // See big comment above why we are doing this. + if serfBindPortWAN == 0 { + serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort + if serfBindPortWAN == 0 { + return nil, fmt.Errorf("Failed to get dynamic bind port for WAN Serf") + } + s.logger.Printf("[INFO] agent: Serf WAN TCP bound to port %d", serfBindPortWAN) } - s.logger.Printf("[INFO] agent: Serf WAN TCP bound to port %d", serfBindPortWAN) } // Initialize the LAN segments before the default LAN Serf so we have @@ -380,20 +385,22 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* s.floodSegments(config) // Add a "static route" to the WAN Serf and hook it up to Serf events. - if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool, s.config.VerifyOutgoing); err != nil { - s.Shutdown() - return nil, fmt.Errorf("Failed to add WAN serf route: %v", err) - } - go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN) - - // Fire up the LAN <-> WAN join flooder. - portFn := func(s *metadata.Server) (int, bool) { - if s.WanJoinPort > 0 { - return s.WanJoinPort, true + if s.serfWAN != nil { + if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool, s.config.VerifyOutgoing); err != nil { + s.Shutdown() + return nil, fmt.Errorf("Failed to add WAN serf route: %v", err) } - return 0, false + go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN) + + // Fire up the LAN <-> WAN join flooder. + portFn := func(s *metadata.Server) (int, bool) { + if s.WanJoinPort > 0 { + return s.WanJoinPort, true + } + return 0, false + } + go s.Flood(nil, portFn, s.serfWAN) } - go s.Flood(nil, portFn, s.serfWAN) // Start monitoring leadership. This must happen after Serf is set up // since it can fire events when leadership is obtained. @@ -831,6 +838,9 @@ func (s *Server) JoinLAN(addrs []string) (int, error) { // The target address should be another node listening on the // Serf WAN address func (s *Server) JoinWAN(addrs []string) (int, error) { + if s.serfWAN == nil { + return 0, ErrWANFederationDisabled + } return s.serfWAN.Join(addrs, true) } @@ -846,6 +856,9 @@ func (s *Server) LANMembers() []serf.Member { // WANMembers is used to return the members of the LAN cluster func (s *Server) WANMembers() []serf.Member { + if s.serfWAN == nil { + return nil + } return s.serfWAN.Members() } @@ -854,8 +867,10 @@ func (s *Server) RemoveFailedNode(node string) error { if err := s.serfLAN.RemoveFailedNode(node); err != nil { return err } - if err := s.serfWAN.RemoveFailedNode(node); err != nil { - return err + if s.serfWAN != nil { + if err := s.serfWAN.RemoveFailedNode(node); err != nil { + return err + } } return nil } @@ -877,7 +892,11 @@ func (s *Server) KeyManagerWAN() *serf.KeyManager { // Encrypted determines if gossip is encrypted func (s *Server) Encrypted() bool { - return s.serfLAN.EncryptionEnabled() && s.serfWAN.EncryptionEnabled() + LANEncrypted := s.serfLAN.EncryptionEnabled() + if s.serfWAN == nil { + return LANEncrypted + } + return LANEncrypted && s.serfWAN.EncryptionEnabled() } // LANSegments returns a map of LAN segments by name @@ -995,9 +1014,11 @@ func (s *Server) Stats() map[string]map[string]string { }, "raft": s.raft.Stats(), "serf_lan": s.serfLAN.Stats(), - "serf_wan": s.serfWAN.Stats(), "runtime": runtimeStats(), } + if s.serfWAN != nil { + stats["serf_wan"] = s.serfWAN.Stats() + } return stats } @@ -1019,11 +1040,6 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) { return cs, nil } -// GetWANCoordinate returns the coordinate of the server in the WAN gossip pool. -func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) { - return s.serfWAN.GetCoordinate() -} - // Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write func (s *Server) setConsistentReadReady() { atomic.StoreInt32(&s.readyForConsistentReads, 1) diff --git a/agent/consul/server_serf.go b/agent/consul/server_serf.go index 325b4c3321..5d46b74ee4 100644 --- a/agent/consul/server_serf.go +++ b/agent/consul/server_serf.go @@ -37,7 +37,9 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter) } else { conf.NodeName = s.config.NodeName - conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort) + if wanPort > 0 { + conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort) + } } conf.Tags["role"] = "consul" conf.Tags["dc"] = s.config.Datacenter