mirror of https://github.com/hashicorp/consul
consul: Use serf event to announce new leader
parent
e116815e17
commit
7123c6315b
|
@ -9,6 +9,7 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -178,6 +179,7 @@ func (c *Client) lanEventHandler() {
|
||||||
case serf.EventMemberFailed:
|
case serf.EventMemberFailed:
|
||||||
c.nodeFail(e.(serf.MemberEvent))
|
c.nodeFail(e.(serf.MemberEvent))
|
||||||
case serf.EventUser:
|
case serf.EventUser:
|
||||||
|
c.localEvent(e.(serf.UserEvent))
|
||||||
default:
|
default:
|
||||||
c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
|
c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
|
||||||
}
|
}
|
||||||
|
@ -250,6 +252,26 @@ func (c *Client) nodeFail(me serf.MemberEvent) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// localEvent is called when we receive an event on the local Serf
|
||||||
|
func (c *Client) localEvent(event serf.UserEvent) {
|
||||||
|
// Handle only consul events
|
||||||
|
if !strings.HasPrefix(event.Name, "consul:") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch event.Name {
|
||||||
|
case newLeaderEvent:
|
||||||
|
c.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload)
|
||||||
|
|
||||||
|
// Trigger the callback
|
||||||
|
if c.config.ServerUp != nil {
|
||||||
|
c.config.ServerUp()
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
c.logger.Printf("[WARN] consul: Unhandled local event: %v", event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// RPC is used to forward an RPC call to a consul server, or fail if no servers
|
// RPC is used to forward an RPC call to a consul server, or fail if no servers
|
||||||
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
||||||
// Check the last rpc time
|
// Check the last rpc time
|
||||||
|
|
|
@ -13,6 +13,7 @@ const (
|
||||||
SerfCheckName = "Serf Health Status"
|
SerfCheckName = "Serf Health Status"
|
||||||
ConsulServiceID = "consul"
|
ConsulServiceID = "consul"
|
||||||
ConsulServiceName = "consul"
|
ConsulServiceName = "consul"
|
||||||
|
newLeaderEvent = "consul:new-leader"
|
||||||
)
|
)
|
||||||
|
|
||||||
// monitorLeadership is used to monitor if we acquire or lose our role
|
// monitorLeadership is used to monitor if we acquire or lose our role
|
||||||
|
@ -42,6 +43,12 @@ func (s *Server) monitorLeadership() {
|
||||||
// leaderLoop runs as long as we are the leader to run various
|
// leaderLoop runs as long as we are the leader to run various
|
||||||
// maintence activities
|
// maintence activities
|
||||||
func (s *Server) leaderLoop(stopCh chan struct{}) {
|
func (s *Server) leaderLoop(stopCh chan struct{}) {
|
||||||
|
// Fire a user event indicating a new leader
|
||||||
|
payload := []byte(s.config.NodeName)
|
||||||
|
if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil {
|
||||||
|
s.logger.Printf("[WARN] consul: failed to broadcast new leader event: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Reconcile channel is only used once initial reconcile
|
// Reconcile channel is only used once initial reconcile
|
||||||
// has succeeded
|
// has succeeded
|
||||||
var reconcileCh chan serf.Member
|
var reconcileCh chan serf.Member
|
||||||
|
|
|
@ -3,6 +3,7 @@ package consul
|
||||||
import (
|
import (
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
"net"
|
"net"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
// lanEventHandler is used to handle events from the lan Serf cluster
|
// lanEventHandler is used to handle events from the lan Serf cluster
|
||||||
|
@ -18,6 +19,7 @@ func (s *Server) lanEventHandler() {
|
||||||
case serf.EventMemberFailed:
|
case serf.EventMemberFailed:
|
||||||
s.localMemberEvent(e.(serf.MemberEvent))
|
s.localMemberEvent(e.(serf.MemberEvent))
|
||||||
case serf.EventUser:
|
case serf.EventUser:
|
||||||
|
s.localEvent(e.(serf.UserEvent))
|
||||||
default:
|
default:
|
||||||
s.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
|
s.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
|
||||||
}
|
}
|
||||||
|
@ -68,6 +70,26 @@ func (s *Server) localMemberEvent(me serf.MemberEvent) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// localEvent is called when we receive an event on the local Serf
|
||||||
|
func (s *Server) localEvent(event serf.UserEvent) {
|
||||||
|
// Handle only consul events
|
||||||
|
if !strings.HasPrefix(event.Name, "consul:") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch event.Name {
|
||||||
|
case newLeaderEvent:
|
||||||
|
s.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload)
|
||||||
|
|
||||||
|
// Trigger the callback
|
||||||
|
if s.config.ServerUp != nil {
|
||||||
|
s.config.ServerUp()
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
s.logger.Printf("[WARN] consul: Unhandled local event: %v", event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// remoteJoin is used to handle join events on the wan serf cluster
|
// remoteJoin is used to handle join events on the wan serf cluster
|
||||||
func (s *Server) remoteJoin(me serf.MemberEvent) {
|
func (s *Server) remoteJoin(me serf.MemberEvent) {
|
||||||
for _, m := range me.Members {
|
for _, m := range me.Members {
|
||||||
|
|
Loading…
Reference in New Issue