|
|
|
@ -223,8 +223,8 @@ type queries struct {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
UserEventSizeLimit = 512 // Maximum byte size for event name and payload
|
|
|
|
|
snapshotSizeLimit = 128 * 1024 // Maximum 128 KB snapshot
|
|
|
|
|
UserEventSizeLimit = 9 * 1024 // Maximum 9KB for event name and payload
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Create creates a new Serf instance, starting all the background tasks
|
|
|
|
@ -242,6 +242,10 @@ func Create(conf *Config) (*Serf, error) {
|
|
|
|
|
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if conf.UserEventSizeLimit > UserEventSizeLimit {
|
|
|
|
|
return nil, fmt.Errorf("user event size limit exceeds limit of %d bytes", UserEventSizeLimit)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger := conf.Logger
|
|
|
|
|
if logger == nil {
|
|
|
|
|
logOutput := conf.LogOutput
|
|
|
|
@ -437,14 +441,25 @@ func (s *Serf) KeyManager() *KeyManager {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// UserEvent is used to broadcast a custom user event with a given
|
|
|
|
|
// name and payload. The events must be fairly small, and if the
|
|
|
|
|
// size limit is exceeded and error will be returned. If coalesce is enabled,
|
|
|
|
|
// nodes are allowed to coalesce this event. Coalescing is only available
|
|
|
|
|
// starting in v0.2
|
|
|
|
|
// name and payload. If the configured size limit is exceeded and error will be returned.
|
|
|
|
|
// If coalesce is enabled, nodes are allowed to coalesce this event.
|
|
|
|
|
// Coalescing is only available starting in v0.2
|
|
|
|
|
func (s *Serf) UserEvent(name string, payload []byte, coalesce bool) error {
|
|
|
|
|
// Check the size limit
|
|
|
|
|
if len(name)+len(payload) > UserEventSizeLimit {
|
|
|
|
|
return fmt.Errorf("user event exceeds limit of %d bytes", UserEventSizeLimit)
|
|
|
|
|
payloadSizeBeforeEncoding := len(name) + len(payload)
|
|
|
|
|
|
|
|
|
|
// Check size before encoding to prevent needless encoding and return early if it's over the specified limit.
|
|
|
|
|
if payloadSizeBeforeEncoding > s.config.UserEventSizeLimit {
|
|
|
|
|
return fmt.Errorf(
|
|
|
|
|
"user event exceeds configured limit of %d bytes before encoding",
|
|
|
|
|
s.config.UserEventSizeLimit,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if payloadSizeBeforeEncoding > UserEventSizeLimit {
|
|
|
|
|
return fmt.Errorf(
|
|
|
|
|
"user event exceeds sane limit of %d bytes before encoding",
|
|
|
|
|
UserEventSizeLimit,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create a message
|
|
|
|
@ -454,16 +469,34 @@ func (s *Serf) UserEvent(name string, payload []byte, coalesce bool) error {
|
|
|
|
|
Payload: payload,
|
|
|
|
|
CC: coalesce,
|
|
|
|
|
}
|
|
|
|
|
s.eventClock.Increment()
|
|
|
|
|
|
|
|
|
|
// Process update locally
|
|
|
|
|
s.handleUserEvent(&msg)
|
|
|
|
|
|
|
|
|
|
// Start broadcasting the event
|
|
|
|
|
raw, err := encodeMessage(messageUserEventType, &msg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check the size after encoding to be sure again that
|
|
|
|
|
// we're not attempting to send over the specified size limit.
|
|
|
|
|
if len(raw) > s.config.UserEventSizeLimit {
|
|
|
|
|
return fmt.Errorf(
|
|
|
|
|
"encoded user event exceeds configured limit of %d bytes after encoding",
|
|
|
|
|
s.config.UserEventSizeLimit,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(raw) > UserEventSizeLimit {
|
|
|
|
|
return fmt.Errorf(
|
|
|
|
|
"encoded user event exceeds sane limit of %d bytes before encoding",
|
|
|
|
|
UserEventSizeLimit,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.eventClock.Increment()
|
|
|
|
|
|
|
|
|
|
// Process update locally
|
|
|
|
|
s.handleUserEvent(&msg)
|
|
|
|
|
|
|
|
|
|
s.eventBroadcasts.QueueBroadcast(&broadcast{
|
|
|
|
|
msg: raw,
|
|
|
|
|
})
|
|
|
|
@ -748,15 +781,26 @@ func (s *Serf) Members() []Member {
|
|
|
|
|
return members
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// RemoveFailedNode forcibly removes a failed node from the cluster
|
|
|
|
|
// RemoveFailedNode is a backwards compatible form
|
|
|
|
|
// of forceleave
|
|
|
|
|
func (s *Serf) RemoveFailedNode(node string) error {
|
|
|
|
|
return s.forceLeave(node, false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Serf) RemoveFailedNodePrune(node string) error {
|
|
|
|
|
return s.forceLeave(node, true)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ForceLeave forcibly removes a failed node from the cluster
|
|
|
|
|
// immediately, instead of waiting for the reaper to eventually reclaim it.
|
|
|
|
|
// This also has the effect that Serf will no longer attempt to reconnect
|
|
|
|
|
// to this node.
|
|
|
|
|
func (s *Serf) RemoveFailedNode(node string) error {
|
|
|
|
|
func (s *Serf) forceLeave(node string, prune bool) error {
|
|
|
|
|
// Construct the message to broadcast
|
|
|
|
|
msg := messageLeave{
|
|
|
|
|
LTime: s.clock.Time(),
|
|
|
|
|
Node: node,
|
|
|
|
|
Prune: prune,
|
|
|
|
|
}
|
|
|
|
|
s.clock.Increment()
|
|
|
|
|
|
|
|
|
@ -1027,6 +1071,7 @@ func (s *Serf) handleNodeUpdate(n *memberlist.Node) {
|
|
|
|
|
|
|
|
|
|
// handleNodeLeaveIntent is called when an intent to leave is received.
|
|
|
|
|
func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
|
|
|
|
|
|
|
|
|
|
// Witness a potentially newer time
|
|
|
|
|
s.clock.Witness(leaveMsg.LTime)
|
|
|
|
|
|
|
|
|
@ -1057,6 +1102,10 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
|
|
|
|
|
case StatusAlive:
|
|
|
|
|
member.Status = StatusLeaving
|
|
|
|
|
member.statusLTime = leaveMsg.LTime
|
|
|
|
|
|
|
|
|
|
if leaveMsg.Prune {
|
|
|
|
|
s.handlePrune(member)
|
|
|
|
|
}
|
|
|
|
|
return true
|
|
|
|
|
case StatusFailed:
|
|
|
|
|
member.Status = StatusLeft
|
|
|
|
@ -1065,6 +1114,7 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
|
|
|
|
|
// Remove from the failed list and add to the left list. We add
|
|
|
|
|
// to the left list so that when we do a sync, other nodes will
|
|
|
|
|
// remove it from their failed list.
|
|
|
|
|
|
|
|
|
|
s.failedMembers = removeOldMember(s.failedMembers, member.Name)
|
|
|
|
|
s.leftMembers = append(s.leftMembers, member)
|
|
|
|
|
|
|
|
|
@ -1079,12 +1129,40 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
|
|
|
|
|
Members: []Member{member.Member},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if leaveMsg.Prune {
|
|
|
|
|
s.handlePrune(member)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true
|
|
|
|
|
|
|
|
|
|
case StatusLeaving, StatusLeft:
|
|
|
|
|
if leaveMsg.Prune {
|
|
|
|
|
s.handlePrune(member)
|
|
|
|
|
}
|
|
|
|
|
return true
|
|
|
|
|
default:
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// handlePrune waits for nodes that are leaving and then forcibly
|
|
|
|
|
// erases a member from the list of members
|
|
|
|
|
func (s *Serf) handlePrune(member *memberState) {
|
|
|
|
|
if member.Status == StatusLeaving {
|
|
|
|
|
time.Sleep(s.config.BroadcastTimeout + s.config.LeavePropagateDelay)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.logger.Printf("[INFO] serf: EventMemberReap (forced): %s %s", member.Name, member.Member.Addr)
|
|
|
|
|
|
|
|
|
|
//If we are leaving or left we may be in that list of members
|
|
|
|
|
if member.Status == StatusLeaving || member.Status == StatusLeft {
|
|
|
|
|
s.leftMembers = removeOldMember(s.leftMembers, member.Name)
|
|
|
|
|
}
|
|
|
|
|
s.eraseNode(member)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// handleNodeJoinIntent is called when a node broadcasts a
|
|
|
|
|
// join message to set the lamport time of its join
|
|
|
|
|
func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool {
|
|
|
|
@ -1405,6 +1483,30 @@ func (s *Serf) resolveNodeConflict() {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//eraseNode takes a node completely out of the member list
|
|
|
|
|
func (s *Serf) eraseNode(m *memberState) {
|
|
|
|
|
// Delete from members
|
|
|
|
|
delete(s.members, m.Name)
|
|
|
|
|
|
|
|
|
|
// Tell the coordinate client the node has gone away and delete
|
|
|
|
|
// its cached coordinates.
|
|
|
|
|
if !s.config.DisableCoordinates {
|
|
|
|
|
s.coordClient.ForgetNode(m.Name)
|
|
|
|
|
|
|
|
|
|
s.coordCacheLock.Lock()
|
|
|
|
|
delete(s.coordCache, m.Name)
|
|
|
|
|
s.coordCacheLock.Unlock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Send an event along
|
|
|
|
|
if s.config.EventCh != nil {
|
|
|
|
|
s.config.EventCh <- MemberEvent{
|
|
|
|
|
Type: EventMemberReap,
|
|
|
|
|
Members: []Member{m.Member},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// handleReap periodically reaps the list of failed and left members, as well
|
|
|
|
|
// as old buffered intents.
|
|
|
|
|
func (s *Serf) handleReap() {
|
|
|
|
@ -1455,27 +1557,10 @@ func (s *Serf) reap(old []*memberState, now time.Time, timeout time.Duration) []
|
|
|
|
|
n--
|
|
|
|
|
i--
|
|
|
|
|
|
|
|
|
|
// Delete from members
|
|
|
|
|
delete(s.members, m.Name)
|
|
|
|
|
|
|
|
|
|
// Tell the coordinate client the node has gone away and delete
|
|
|
|
|
// its cached coordinates.
|
|
|
|
|
if !s.config.DisableCoordinates {
|
|
|
|
|
s.coordClient.ForgetNode(m.Name)
|
|
|
|
|
|
|
|
|
|
s.coordCacheLock.Lock()
|
|
|
|
|
delete(s.coordCache, m.Name)
|
|
|
|
|
s.coordCacheLock.Unlock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Send an event along
|
|
|
|
|
// Delete from members and send out event
|
|
|
|
|
s.logger.Printf("[INFO] serf: EventMemberReap: %s", m.Name)
|
|
|
|
|
if s.config.EventCh != nil {
|
|
|
|
|
s.config.EventCh <- MemberEvent{
|
|
|
|
|
Type: EventMemberReap,
|
|
|
|
|
Members: []Member{m.Member},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
s.eraseNode(m)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return old
|
|
|
|
|