Makes the flood goroutine more reusable.

pull/2801/head
James Phillips 8 years ago
parent d556d14154
commit 7a451f728e
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11

@ -0,0 +1,68 @@
package consul
import (
"time"
"github.com/hashicorp/consul/consul/servers"
"github.com/hashicorp/serf/serf"
)
// FloodNotify lets all the waiting Flood goroutines know that some change may
// have affected them.
func (s *Server) FloodNotify() {
s.floodLock.RLock()
defer s.floodLock.RUnlock()
for _, ch := range s.floodCh {
select {
case ch <- struct{}{}:
default:
}
}
}
// Flood is a long-running goroutine that floods servers from the LAN to the
// given global Serf instance, such as the WAN. This will exit once either of
// the Serf instances are shut down.
func (s *Server) Flood(portFn servers.FloodPortFn, global *serf.Serf) {
s.floodLock.Lock()
floodCh := make(chan struct{})
s.floodCh = append(s.floodCh, floodCh)
s.floodLock.Unlock()
ticker := time.NewTicker(s.config.SerfFloodInterval)
defer ticker.Stop()
defer func() {
s.floodLock.Lock()
defer s.floodLock.Unlock()
for i, ch := range s.floodCh {
if ch == floodCh {
s.floodCh = append(s.floodCh[:i], s.floodCh[i+1:]...)
return
}
}
panic("flood channels out of sync")
}()
for {
WAIT:
select {
case <-s.serfLAN.ShutdownCh():
return
case <-global.ShutdownCh():
return
case <-ticker.C:
goto FLOOD
case <-floodCh:
goto FLOOD
}
goto WAIT
FLOOD:
servers.FloodJoins(s.logger, portFn, s.config.Datacenter, s.serfLAN, global)
}
}

@ -143,11 +143,8 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
s.maybeBootstrap()
}
// Kick the WAN flooder.
select {
case s.floodCh <- struct{}{}:
default:
}
// Kick the join flooders.
s.FloodNotify()
}
}

@ -156,9 +156,9 @@ type Server struct {
// which SHOULD only consist of Consul servers
serfWAN *serf.Serf
// floodCh is kicked whenever we should try to flood LAN servers into
// the WAN.
floodCh chan struct{}
// floodLock controls access to floodCh.
floodLock sync.RWMutex
floodCh []chan struct{}
// sessionTimers track the expiration time of each Session that has
// a TTL. On expiration, a SessionDestroy event will occur, and
@ -258,7 +258,6 @@ func NewServer(config *Config) (*Server, error) {
router: servers.NewRouter(logger, shutdownCh, config.Datacenter),
rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS,
floodCh: make(chan struct{}),
tombstoneGC: gc,
shutdownCh: make(chan struct{}),
}
@ -318,40 +317,15 @@ func NewServer(config *Config) (*Server, error) {
}
go servers.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)
// Fire up the LAN <-> WAN Serf join flooder.
go func() {
ticker := time.NewTicker(config.SerfFloodInterval)
defer ticker.Stop()
portFn := func(s *agent.Server) (int, bool) {
if s.WanJoinPort > 0 {
return s.WanJoinPort, true
} else {
return 0, false
}
// Fire up the LAN <-> WAN join flooder.
portFn := func(s *agent.Server) (int, bool) {
if s.WanJoinPort > 0 {
return s.WanJoinPort, true
} else {
return 0, false
}
for {
WAIT:
select {
case <-s.serfLAN.ShutdownCh():
return
case <-s.serfWAN.ShutdownCh():
return
case <-ticker.C:
goto FLOOD
case <-s.floodCh:
goto FLOOD
}
goto WAIT
FLOOD:
servers.FloodJoins(s.logger, portFn, config.Datacenter, s.serfLAN, s.serfWAN)
}
}()
}
go s.Flood(portFn, s.serfWAN)
// Start monitoring leadership. This must happen after Serf is set up
// since it can fire events when leadership is obtained.

@ -72,9 +72,13 @@ func FloodJoins(logger *log.Logger, portFn FloodPortFn,
}
// Do the join!
if _, err := globalSerf.Join([]string{addr}, true); err != nil {
n, err := globalSerf.Join([]string{addr}, true)
if err != nil {
logger.Printf("[DEBUG] consul: Failed to flood-join %q at %s: %v",
server.Name, addr, err)
} else if n > 0 {
logger.Printf("[INFO] consul: Successfully performed flood-join for %q at %s",
server.Name, addr)
}
}
}

Loading…
Cancel
Save