Update Serf and memberlist (#4511)

This includes fixes that improve gossip scalability on very large (> 10k node) clusters.

The Serf changes:
 - take snapshot disk IO out of the critical path for handling messages hashicorp/serf#524
 - make snapshot compaction much less aggressive - the old fixed threshold caused snapshots to be constantly compacted (synchronously with request handling) on clusters larger than about 2000 nodes! hashicorp/serf#525

Memberlist changes:
 - prioritize handling alive messages over suspect/dead to improve stability, and handle queue in LIFO order to avoid acting on info that 's already stale in the queue by the time we handle it. hashicorp/memberlist#159
 - limit the number of concurrent pushPull requests being handled at once to 128. In one test scenario with 10s of thousands of servers we saw channel and lock blocking cause over 3000 pushPulls at once which ballooned the memory of the server because each push pull contained a de-serialised list of all known 10k+ nodes and their tags for a total of about 60 million objects and 7GB of memory stuck. While the rest of the fixes here should prevent the same root cause from blocking in the same way, this prevents any other bug or source of contention from allowing pushPull messages to stack up and eat resources. hashicorp/memberlist#158
pull/4517/head
Paul Banks 6 years ago committed by Matt Keeler
parent c88900aaa9
commit 9ce10769ce

@ -16,4 +16,4 @@ deps:
go get -d -v ./...
echo $(DEPS) | xargs -n1 go get -d
.PNONY: test cov integ
.PHONY: test cov integ

@ -65,7 +65,7 @@ For complete documentation, see the associated [Godoc](http://godoc.org/github.c
## Protocol
memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf). However, we extend the protocol in a number of ways:
memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://ieeexplore.ieee.org/document/1028914/). However, we extend the protocol in a number of ways:
* Several extensions are made to increase propagation speed and
convergence rate.

@ -15,6 +15,7 @@ multiple routes.
package memberlist
import (
"container/list"
"fmt"
"log"
"net"
@ -34,6 +35,7 @@ type Memberlist struct {
sequenceNum uint32 // Local sequence number
incarnation uint32 // Local incarnation number
numNodes uint32 // Number of known nodes (estimate)
pushPullReq uint32 // Number of push/pull requests
config *Config
shutdown int32 // Used as an atomic boolean value
@ -45,7 +47,11 @@ type Memberlist struct {
leaveLock sync.Mutex // Serializes calls to Leave
transport Transport
handoff chan msgHandoff
handoffCh chan struct{}
highPriorityMsgQueue *list.List
lowPriorityMsgQueue *list.List
msgQueueLock sync.Mutex
nodeLock sync.RWMutex
nodes []*nodeState // Known nodes
@ -160,17 +166,19 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
}
m := &Memberlist{
config: conf,
shutdownCh: make(chan struct{}),
leaveBroadcast: make(chan struct{}, 1),
transport: transport,
handoff: make(chan msgHandoff, conf.HandoffQueueDepth),
nodeMap: make(map[string]*nodeState),
nodeTimers: make(map[string]*suspicion),
awareness: newAwareness(conf.AwarenessMaxMultiplier),
ackHandlers: make(map[uint32]*ackHandler),
broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
logger: logger,
config: conf,
shutdownCh: make(chan struct{}),
leaveBroadcast: make(chan struct{}, 1),
transport: transport,
handoffCh: make(chan struct{}, 1),
highPriorityMsgQueue: list.New(),
lowPriorityMsgQueue: list.New(),
nodeMap: make(map[string]*nodeState),
nodeTimers: make(map[string]*suspicion),
awareness: newAwareness(conf.AwarenessMaxMultiplier),
ackHandlers: make(map[uint32]*ackHandler),
broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
logger: logger,
}
m.broadcasts.NumNodes = func() int {
return m.estNumNodes()
@ -639,7 +647,9 @@ func (m *Memberlist) Shutdown() error {
// Shut down the transport first, which should block until it's
// completely torn down. If we kill the memberlist-side handlers
// those I/O handlers might get stuck.
m.transport.Shutdown()
if err := m.transport.Shutdown(); err != nil {
m.logger.Printf("[ERR] Failed to shutdown transport: %v", err)
}
// Now tear down everything else.
atomic.StoreInt32(&m.shutdown, 1)

@ -8,9 +8,10 @@ import (
"hash/crc32"
"io"
"net"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-msgpack/codec"
)
@ -71,7 +72,8 @@ const (
compoundOverhead = 2 // Assumed overhead per entry in compoundHeader
userMsgOverhead = 1
blockingWarning = 10 * time.Millisecond // Warn if a UDP packet takes this long to process
maxPushStateBytes = 10 * 1024 * 1024
maxPushStateBytes = 20 * 1024 * 1024
maxPushPullRequests = 128 // Maximum number of concurrent push/pull requests
)
// ping request sent directly to node
@ -238,6 +240,16 @@ func (m *Memberlist) handleConn(conn net.Conn) {
m.logger.Printf("[ERR] memberlist: Failed to receive user message: %s %s", err, LogConn(conn))
}
case pushPullMsg:
// Increment counter of pending push/pulls
numConcurrent := atomic.AddUint32(&m.pushPullReq, 1)
defer atomic.AddUint32(&m.pushPullReq, ^uint32(0))
// Check if we have too many open push/pull requests
if numConcurrent >= maxPushPullRequests {
m.logger.Printf("[ERR] memberlist: Too many pending push/pull requests")
return
}
join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s %s", err, LogConn(conn))
@ -357,10 +369,25 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim
case deadMsg:
fallthrough
case userMsg:
// Determine the message queue, prioritize alive
queue := m.lowPriorityMsgQueue
if msgType == aliveMsg {
queue = m.highPriorityMsgQueue
}
// Check for overflow and append if not full
m.msgQueueLock.Lock()
if queue.Len() >= m.config.HandoffQueueDepth {
m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
} else {
queue.PushBack(msgHandoff{msgType, buf, from})
}
m.msgQueueLock.Unlock()
// Notify of pending message
select {
case m.handoff <- msgHandoff{msgType, buf, from}:
case m.handoffCh <- struct{}{}:
default:
m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
}
default:
@ -368,28 +395,51 @@ func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Tim
}
}
// getNextMessage returns the next message to process in priority order, using LIFO
func (m *Memberlist) getNextMessage() (msgHandoff, bool) {
m.msgQueueLock.Lock()
defer m.msgQueueLock.Unlock()
if el := m.highPriorityMsgQueue.Back(); el != nil {
m.highPriorityMsgQueue.Remove(el)
msg := el.Value.(msgHandoff)
return msg, true
} else if el := m.lowPriorityMsgQueue.Back(); el != nil {
m.lowPriorityMsgQueue.Remove(el)
msg := el.Value.(msgHandoff)
return msg, true
}
return msgHandoff{}, false
}
// packetHandler is a long running goroutine that processes messages received
// over the packet interface, but is decoupled from the listener to avoid
// blocking the listener which may cause ping/ack messages to be delayed.
func (m *Memberlist) packetHandler() {
for {
select {
case msg := <-m.handoff:
msgType := msg.msgType
buf := msg.buf
from := msg.from
switch msgType {
case suspectMsg:
m.handleSuspect(buf, from)
case aliveMsg:
m.handleAlive(buf, from)
case deadMsg:
m.handleDead(buf, from)
case userMsg:
m.handleUser(buf, from)
default:
m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from))
case <-m.handoffCh:
for {
msg, ok := m.getNextMessage()
if !ok {
break
}
msgType := msg.msgType
buf := msg.buf
from := msg.from
switch msgType {
case suspectMsg:
m.handleSuspect(buf, from)
case aliveMsg:
m.handleAlive(buf, from)
case deadMsg:
m.handleDead(buf, from)
case userMsg:
m.handleUser(buf, from)
default:
m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from))
}
}
case <-m.shutdownCh:
@ -1059,7 +1109,7 @@ func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error {
// operations, given the deadline. The bool return parameter is true if we
// we able to round trip a ping to the other node.
func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time.Time) (bool, error) {
conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout)
conn, err := m.transport.DialTimeout(addr, deadline.Sub(time.Now()))
if err != nil {
// If the node is actually dead we expect this to fail, so we
// shouldn't spam the logs with it. After this point, errors
@ -1094,7 +1144,7 @@ func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time
}
if ack.SeqNo != ping.SeqNo {
return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo, LogConn(conn))
return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo)
}
return true, nil

@ -6,6 +6,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/memberlist"
)
// delegate is the memberlist.Delegate implementation that Serf uses.
@ -13,6 +14,8 @@ type delegate struct {
serf *Serf
}
var _ memberlist.Delegate = &delegate{}
func (d *delegate) NodeMeta(limit int) []byte {
roleBytes := d.serf.encodeTags(d.serf.config.Tags)
if len(roleBytes) > limit {

@ -189,4 +189,4 @@ func (k *KeyManager) ListKeysWithOptions(opts *KeyRequestOptions) (*KeyResponse,
defer k.l.RUnlock()
return k.handleKeyRequest("", listKeysQuery, opts)
}
}

@ -1331,7 +1331,7 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
// handleNodeConflict is invoked when a join detects a conflict over a name.
// This means two different nodes (IP/Port) are claiming the same name. Memberlist
// will reject the "new" node mapping, but we can still be notified
// will reject the "new" node mapping, but we can still be notified.
func (s *Serf) handleNodeConflict(existing, other *memberlist.Node) {
// Log a basic warning if the node is not us...
if existing.Name != s.config.NodeName {

@ -25,10 +25,34 @@ nodes to re-join, as well as restore our clock values to avoid replaying
old events.
*/
const flushInterval = 500 * time.Millisecond
const clockUpdateInterval = 500 * time.Millisecond
const tmpExt = ".compact"
const snapshotErrorRecoveryInterval = 30 * time.Second
const (
// flushInterval is how often we force a flush of the snapshot file
flushInterval = 500 * time.Millisecond
// clockUpdateInterval is how often we fetch the current lamport time of the cluster and write to the snapshot file
clockUpdateInterval = 500 * time.Millisecond
// tmpExt is the extention we use for the temporary file during compaction
tmpExt = ".compact"
// snapshotErrorRecoveryInterval is how often we attempt to recover from
// errors writing to the snapshot file.
snapshotErrorRecoveryInterval = 30 * time.Second
// eventChSize is the size of the event buffers between Serf and the
// consuming application. If this is exhausted we will block Serf and Memberlist.
eventChSize = 2048
// shutdownFlushTimeout is the time limit to write pending events to the snapshot during a shutdown
shutdownFlushTimeout = 250 * time.Millisecond
// snapshotBytesPerNode is an estimated bytes per node to snapshot
snapshotBytesPerNode = 128
// snapshotCompactionThreshold is the threshold we apply to
// the snapshot size estimate (nodes * bytes per node) before compacting.
snapshotCompactionThreshold = 2
)
// Snapshotter is responsible for ingesting events and persisting
// them to disk, and providing a recovery mechanism at start time.
@ -38,6 +62,7 @@ type Snapshotter struct {
fh *os.File
buffered *bufio.Writer
inCh <-chan Event
streamCh chan Event
lastFlush time.Time
lastClock LamportTime
lastEventClock LamportTime
@ -45,7 +70,7 @@ type Snapshotter struct {
leaveCh chan struct{}
leaving bool
logger *log.Logger
maxSize int64
minCompactSize int64
path string
offset int64
outCh chan<- Event
@ -72,13 +97,14 @@ func (p PreviousNode) String() string {
// Setting rejoinAfterLeave makes leave not clear the state, and can be used
// if you intend to rejoin the same cluster after a leave.
func NewSnapshotter(path string,
maxSize int,
minCompactSize int,
rejoinAfterLeave bool,
logger *log.Logger,
clock *LamportClock,
outCh chan<- Event,
shutdownCh <-chan struct{}) (chan<- Event, *Snapshotter, error) {
inCh := make(chan Event, 1024)
inCh := make(chan Event, eventChSize)
streamCh := make(chan Event, eventChSize)
// Try to open the file
fh, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644)
@ -101,12 +127,13 @@ func NewSnapshotter(path string,
fh: fh,
buffered: bufio.NewWriter(fh),
inCh: inCh,
streamCh: streamCh,
lastClock: 0,
lastEventClock: 0,
lastQueryClock: 0,
leaveCh: make(chan struct{}),
logger: logger,
maxSize: int64(maxSize),
minCompactSize: int64(minCompactSize),
path: path,
offset: offset,
outCh: outCh,
@ -122,6 +149,7 @@ func NewSnapshotter(path string,
}
// Start handling new commands
go snap.teeStream()
go snap.stream()
return inCh, snap, nil
}
@ -171,11 +199,69 @@ func (s *Snapshotter) Leave() {
}
}
// teeStream is a long running routine that is used to copy events
// to the output channel and the internal event handler.
func (s *Snapshotter) teeStream() {
flushEvent := func(e Event) {
// Forward to the internal stream, do not block
select {
case s.streamCh <- e:
default:
}
// Forward the event immediately, do not block
if s.outCh != nil {
select {
case s.outCh <- e:
default:
}
}
}
OUTER:
for {
select {
case e := <-s.inCh:
flushEvent(e)
case <-s.shutdownCh:
break OUTER
}
}
// Drain any remaining events before exiting
for {
select {
case e := <-s.inCh:
flushEvent(e)
default:
return
}
}
}
// stream is a long running routine that is used to handle events
func (s *Snapshotter) stream() {
clockTicker := time.NewTicker(clockUpdateInterval)
defer clockTicker.Stop()
// flushEvent is used to handle writing out an event
flushEvent := func(e Event) {
// Stop recording events after a leave is issued
if s.leaving {
return
}
switch typed := e.(type) {
case MemberEvent:
s.processMemberEvent(typed)
case UserEvent:
s.processUserEvent(typed)
case *Query:
s.processQuery(typed)
default:
s.logger.Printf("[ERR] serf: Unknown event to snapshot: %#v", e)
}
}
for {
select {
case <-s.leaveCh:
@ -193,31 +279,32 @@ func (s *Snapshotter) stream() {
s.logger.Printf("[ERR] serf: failed to sync leave to snapshot: %v", err)
}
case e := <-s.inCh:
// Forward the event immediately
if s.outCh != nil {
s.outCh <- e
}
// Stop recording events after a leave is issued
if s.leaving {
continue
}
switch typed := e.(type) {
case MemberEvent:
s.processMemberEvent(typed)
case UserEvent:
s.processUserEvent(typed)
case *Query:
s.processQuery(typed)
default:
s.logger.Printf("[ERR] serf: Unknown event to snapshot: %#v", e)
}
case e := <-s.streamCh:
flushEvent(e)
case <-clockTicker.C:
s.updateClock()
case <-s.shutdownCh:
// Setup a timeout
flushTimeout := time.After(shutdownFlushTimeout)
// Snapshot the clock
s.updateClock()
// Clear out the buffers
FLUSH:
for {
select {
case e := <-s.streamCh:
flushEvent(e)
case <-flushTimeout:
break FLUSH
default:
break FLUSH
}
}
if err := s.buffered.Flush(); err != nil {
s.logger.Printf("[ERR] serf: failed to flush snapshot: %v", err)
}
@ -321,12 +408,25 @@ func (s *Snapshotter) appendLine(l string) error {
// Check if a compaction is necessary
s.offset += int64(n)
if s.offset > s.maxSize {
if s.offset > s.snapshotMaxSize() {
return s.compact()
}
return nil
}
// snapshotMaxSize computes the maximum size and is used to force periodic compaction.
func (s *Snapshotter) snapshotMaxSize() int64 {
nodes := int64(len(s.aliveNodes))
estSize := nodes * snapshotBytesPerNode
threshold := estSize * snapshotCompactionThreshold
// Apply a minimum threshold to avoid frequent compaction
if threshold < s.minCompactSize {
threshold = s.minCompactSize
}
return threshold
}
// Compact is used to compact the snapshot once it is too large
func (s *Snapshotter) compact() error {
defer metrics.MeasureSince([]string{"serf", "snapshot", "compact"}, time.Now())

@ -99,12 +99,12 @@
{"path":"github.com/hashicorp/hil","checksumSHA1":"kqCMCHy2b+RBMKC+ER+OPqp8C3E=","revision":"1e86c6b523c55d1fa6c6e930ce80b548664c95c2","revisionTime":"2016-07-11T23:18:37Z"},
{"path":"github.com/hashicorp/hil/ast","checksumSHA1":"UICubs001+Q4MsUf9zl2vcMzWQQ=","revision":"1e86c6b523c55d1fa6c6e930ce80b548664c95c2","revisionTime":"2016-07-11T23:18:37Z"},
{"path":"github.com/hashicorp/logutils","checksumSHA1":"vt+P9D2yWDO3gdvdgCzwqunlhxU=","revision":"0dc08b1671f34c4250ce212759ebd880f743d883","revisionTime":"2015-06-09T07:04:31Z"},
{"path":"github.com/hashicorp/memberlist","checksumSHA1":"88DoUaWD6hS1KTt57RMQ7wxHu/k=","revision":"9bdd37bfb26bd039c08b0f36be6f80ceede4aaf3","revisionTime":"2017-11-17T04:34:18Z"},
{"path":"github.com/hashicorp/memberlist","checksumSHA1":"q6yTL5vSGnWxUtcocVU3YIG/HNc=","revision":"b195c8e4fcc6284fff1583fd6ab09e68ca207551","revisionTime":"2018-08-09T14:04:54Z"},
{"path":"github.com/hashicorp/net-rpc-msgpackrpc","checksumSHA1":"qnlqWJYV81ENr61SZk9c65R1mDo=","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3","revisionTime":"2015-11-16T02:03:38Z"},
{"path":"github.com/hashicorp/raft","checksumSHA1":"JjJtGJi1ywWhVhs/PvTXxe4TeD8=","revision":"6d14f0c70869faabd9e60ba7ed88a6cbbd6a661f","revisionTime":"2017-10-03T22:09:13Z","version":"v1.0.0","versionExact":"v1.0.0"},
{"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"},
{"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","revision":"4b67f2c2b2bb5b748d934a6d48221062e43d2274","revisionTime":"2018-05-04T20:06:40Z"},
{"path":"github.com/hashicorp/serf/serf","checksumSHA1":"QrT+nzyXsD/MmhTjjhcPdnALZ1I=","revision":"4b67f2c2b2bb5b748d934a6d48221062e43d2274","revisionTime":"2018-05-04T20:06:40Z"},
{"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","revision":"19bbd39e421bdf3559d5025fb2c760f5ffa56233","revisionTime":"2018-08-09T14:17:58Z"},
{"path":"github.com/hashicorp/serf/serf","checksumSHA1":"axdQxCEwvUr1AygfYIMMxPkS1pY=","revision":"19bbd39e421bdf3559d5025fb2c760f5ffa56233","revisionTime":"2018-08-09T14:17:58Z"},
{"path":"github.com/hashicorp/vault/api","checksumSHA1":"LYQZ+o7zJCda/6LibdN0spFco34=","revision":"533003e27840d9646cb4e7d23b3a113895da1dd0","revisionTime":"2018-06-20T14:55:40Z","version":"v0.10.3","versionExact":"v0.10.3"},
{"path":"github.com/hashicorp/vault/audit","checksumSHA1":"2JOC+Ur0S3U8Gqv2cfNB3zxgSBk=","revision":"c737968235c8673b872350f0a047877bee396342","revisionTime":"2018-06-20T16:45:32Z"},
{"path":"github.com/hashicorp/vault/builtin/logical/database/dbplugin","checksumSHA1":"RCwWixWwKG6j2vF9iVoxbCzo6p4=","revision":"c737968235c8673b872350f0a047877bee396342","revisionTime":"2018-06-20T16:45:32Z"},

Loading…
Cancel
Save