update github.com/hashicorp/{serf,memberlist,go-sockaddr} (#5189)

This activates large-cluster improvements in the gossip layer from
R.B. Boyer 2019-01-07 15:00:47 -06:00 committed by GitHub
parent d4f7a830a8
commit b96391ecff
No known key found for this signature in database
13 changed files with 485 additions and 89 deletions

View File

@ -16,7 +16,7 @@ var (
// Centralize all regexps and regexp.Copy() where necessary.
signRE *regexp.Regexp = regexp.MustCompile(`^[\s]*[+-]`)
whitespaceRE *regexp.Regexp = regexp.MustCompile(`[\s]+`)
ifNameRE *regexp.Regexp = regexp.MustCompile(`^Ethernet adapter ([^\s:]+):`)
ifNameRE *regexp.Regexp = regexp.MustCompile(`^(?:Ethernet|Wireless LAN) adapter ([^:]+):`)
ipAddrRE *regexp.Regexp = regexp.MustCompile(`^ IPv[46] Address\. \. \. \. \. \. \. \. \. \. \. : ([^\s]+)`)

View File

@ -1,4 +1,5 @@
DEPS = $(go list -f '{{range .Imports}}{{.}} {{end}}' ./...)
DEPS := $(shell go list -f '{{range .Imports}}{{.}} {{end}}' ./...)
test: subnet
go test ./...
@ -13,7 +14,7 @@ cov:
open /tmp/coverage.html
go get -d -v ./...
go get -t -d -v ./...
echo $(DEPS) | xargs -n1 go get -d
.PHONY: test cov integ

View File

@ -1,4 +1,4 @@
# memberlist [![GoDoc](https://godoc.org/github.com/hashicorp/memberlist?status.png)](https://godoc.org/github.com/hashicorp/memberlist)
# memberlist [![GoDoc](https://godoc.org/github.com/hashicorp/memberlist?status.png)](https://godoc.org/github.com/hashicorp/memberlist) [![Build Status](https://travis-ci.org/hashicorp/memberlist.svg?branch=master)](https://travis-ci.org/hashicorp/memberlist)
memberlist is a [Go](http://www.golang.org) library that manages cluster
membership and member failure detection using a gossip based protocol.

View File

@ -29,6 +29,11 @@ func (b *memberlistBroadcast) Invalidates(other Broadcast) bool {
return b.node == mb.node
// memberlist.NamedBroadcast optional interface
func (b *memberlistBroadcast) Name() string {
return b.node
func (b *memberlistBroadcast) Message() []byte {
return b.msg

vendor/github.com/hashicorp/memberlist/go.mod generated vendored Normal file
View File

@ -0,0 +1,24 @@
module github.com/hashicorp/memberlist
require (
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/go-sockaddr v0.0.0-20190103214136-e92cdb5343bb
github.com/kr/pretty v0.1.0 // indirect
github.com/miekg/dns v1.0.14
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529
github.com/stretchr/testify v1.2.2
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3 // indirect
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 // indirect
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5 // indirect
google.golang.org/appengine v1.4.0 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/vmihailenco/msgpack.v2 v2.9.1 // indirect

vendor/github.com/hashicorp/memberlist/go.sum generated vendored Normal file
View File

@ -0,0 +1,53 @@
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c h1:BTAbnbegUIMB6xmQCwWE8yRzbA4XSpnZY5hvRJC188I=
github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-sockaddr v0.0.0-20190103214136-e92cdb5343bb h1:YrwA8w5SBkUIH5BzN2pMYhno+txUCOD5+PVXwLS6ddI=
github.com/hashicorp/go-sockaddr v0.0.0-20190103214136-e92cdb5343bb/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/miekg/dns v1.0.14 h1:9jZdLNd/P4+SfEJ0TNyxYpsK8N4GtfylBLqtbYN1sbA=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c h1:Lgl0gzECD8GnQ5QCWA8o6BtfL6mDH5rQgM4/fX3avOs=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3 h1:KYQXGkl6vs02hK7pK4eIbw0NpNPedieTSTEiJ//bwGs=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519 h1:x6rhz8Y9CjbgQkccRGmELH6K+LJj7tOoh3XWeC1yaQM=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5 h1:x6r4Jo0KNzOOzYd8lbcRsqjuqEASK6ob3auvWYM4/8U=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/vmihailenco/msgpack.v2 v2.9.1 h1:kb0VV7NuIojvRfzwslQeP3yArBqJHW9tOl4t38VS1jM=
gopkg.in/vmihailenco/msgpack.v2 v2.9.1/go.mod h1:/3Dn1Npt9+MYyLpYYXjInO/5jvMLamn+AEGwNEOatn8=

View File

@ -665,3 +665,27 @@ func (m *Memberlist) hasShutdown() bool {
func (m *Memberlist) hasLeft() bool {
return atomic.LoadInt32(&m.leave) == 1
func (m *Memberlist) getNodeState(addr string) nodeStateType {
defer m.nodeLock.RUnlock()
n := m.nodeMap[addr]
return n.State
func (m *Memberlist) getNodeStateChange(addr string) time.Time {
defer m.nodeLock.RUnlock()
n := m.nodeMap[addr]
return n.StateChange
func (m *Memberlist) changeNode(addr string, f func(*nodeState)) {
defer m.nodeLock.Unlock()
n := m.nodeMap[addr]

View File

@ -221,6 +221,16 @@ func (t *NetTransport) Shutdown() error {
// and hands them off to the stream channel.
func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) {
defer t.wg.Done()
// baseDelay is the initial delay after an AcceptTCP() error before attempting again
const baseDelay = 5 * time.Millisecond
// maxDelay is the maximum delay after an AcceptTCP() error before attempting again.
// In the case that tcpListen() is error-looping, it will delay the shutdown check.
// Therefore, changes to maxDelay may have an effect on the latency of shutdown.
const maxDelay = 1 * time.Second
var loopDelay time.Duration
for {
conn, err := tcpLn.AcceptTCP()
if err != nil {
@ -228,9 +238,22 @@ func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) {
if loopDelay == 0 {
loopDelay = baseDelay
} else {
loopDelay *= 2
if loopDelay > maxDelay {
loopDelay = maxDelay
t.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %v", err)
// No error, reset loop delay
loopDelay = 0
t.streamCh <- conn

View File

@ -1,8 +1,10 @@
package memberlist
import (
// TransmitLimitedQueue is used to queue messages to broadcast to
@ -19,15 +21,93 @@ type TransmitLimitedQueue struct {
// number of retransmissions attempted.
RetransmitMult int
bcQueue limitedBroadcasts
mu sync.Mutex
tq *btree.BTree // stores *limitedBroadcast as btree.Item
tm map[string]*limitedBroadcast
idGen int64
type limitedBroadcast struct {
transmits int // Number of transmissions attempted.
transmits int // btree-key[0]: Number of transmissions attempted.
msgLen int64 // btree-key[1]: copied from len(b.Message())
id int64 // btree-key[2]: unique incrementing id stamped at submission time
b Broadcast
name string // set if Broadcast is a NamedBroadcast
// Less tests whether the current item is less than the given argument.
// This must provide a strict weak ordering.
// If !a.Less(b) && !b.Less(a), we treat this to mean a == b (i.e. we can only
// hold one of either a or b in the tree).
// default ordering is
// - [transmits=0, ..., transmits=inf]
// - [transmits=0:len=999, ..., transmits=0:len=2, ...]
// - [transmits=0:len=999,id=999, ..., transmits=0:len=999:id=1, ...]
func (b *limitedBroadcast) Less(than btree.Item) bool {
o := than.(*limitedBroadcast)
if b.transmits < o.transmits {
return true
} else if b.transmits > o.transmits {
return false
if b.msgLen > o.msgLen {
return true
} else if b.msgLen < o.msgLen {
return false
return b.id > o.id
// for testing; emits in transmit order if reverse=false
func (q *TransmitLimitedQueue) orderedView(reverse bool) []*limitedBroadcast {
defer q.mu.Unlock()
out := make([]*limitedBroadcast, 0, q.lenLocked())
q.walkReadOnlyLocked(reverse, func(cur *limitedBroadcast) bool {
out = append(out, cur)
return true
return out
// walkReadOnlyLocked calls f for each item in the queue traversing it in
// natural order (by Less) when reverse=false and the opposite when true. You
// must hold the mutex.
// This method panics if you attempt to mutate the item during traversal. The
// underlying btree should also not be mutated during traversal.
func (q *TransmitLimitedQueue) walkReadOnlyLocked(reverse bool, f func(*limitedBroadcast) bool) {
if q.lenLocked() == 0 {
iter := func(item btree.Item) bool {
cur := item.(*limitedBroadcast)
prevTransmits := cur.transmits
prevMsgLen := cur.msgLen
prevID := cur.id
keepGoing := f(cur)
if prevTransmits != cur.transmits || prevMsgLen != cur.msgLen || prevID != cur.id {
panic("edited queue while walking read only")
return keepGoing
if reverse {
q.tq.Descend(iter) // end with transmit 0
} else {
q.tq.Ascend(iter) // start with transmit 0
type limitedBroadcasts []*limitedBroadcast
// Broadcast is something that can be broadcasted via gossip to
// the memberlist cluster.
@ -45,123 +125,298 @@ type Broadcast interface {
// NamedBroadcast is an optional extension of the Broadcast interface that
// gives each message a unique string name, and that is used to optimize
// You shoud ensure that Invalidates() checks the same uniqueness as the
// example below:
// func (b *foo) Invalidates(other Broadcast) bool {
// nb, ok := other.(NamedBroadcast)
// if !ok {
// return false
// }
// return b.Name() == nb.Name()
// }
// Invalidates() isn't currently used for NamedBroadcasts, but that may change
// in the future.
type NamedBroadcast interface {
// The unique identity of this broadcast message.
Name() string
// UniqueBroadcast is an optional interface that indicates that each message is
// intrinsically unique and there is no need to scan the broadcast queue for
// duplicates.
// You should ensure that Invalidates() always returns false if implementing
// this interface. Invalidates() isn't currently used for UniqueBroadcasts, but
// that may change in the future.
type UniqueBroadcast interface {
// UniqueBroadcast is just a marker method for this interface.
// QueueBroadcast is used to enqueue a broadcast
func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) {
defer q.Unlock()
q.queueBroadcast(b, 0)
// Check if this message invalidates another
n := len(q.bcQueue)
for i := 0; i < n; i++ {
if b.Invalidates(q.bcQueue[i].b) {
copy(q.bcQueue[i:], q.bcQueue[i+1:])
q.bcQueue[n-1] = nil
q.bcQueue = q.bcQueue[:n-1]
// lazyInit initializes internal data structures the first time they are
// needed. You must already hold the mutex.
func (q *TransmitLimitedQueue) lazyInit() {
if q.tq == nil {
q.tq = btree.New(32)
if q.tm == nil {
q.tm = make(map[string]*limitedBroadcast)
// queueBroadcast is like QueueBroadcast but you can use a nonzero value for
// the initial transmit tier assigned to the message. This is meant to be used
// for unit testing.
func (q *TransmitLimitedQueue) queueBroadcast(b Broadcast, initialTransmits int) {
defer q.mu.Unlock()
if q.idGen == math.MaxInt64 {
// it's super duper unlikely to wrap around within the retransmit limit
q.idGen = 1
} else {
id := q.idGen
lb := &limitedBroadcast{
transmits: initialTransmits,
msgLen: int64(len(b.Message())),
id: id,
b: b,
unique := false
if nb, ok := b.(NamedBroadcast); ok {
lb.name = nb.Name()
} else if _, ok := b.(UniqueBroadcast); ok {
unique = true
// Check if this message invalidates another.
if lb.name != "" {
if old, ok := q.tm[lb.name]; ok {
} else if !unique {
// Slow path, hopefully nothing hot hits this.
var remove []*limitedBroadcast
q.tq.Ascend(func(item btree.Item) bool {
cur := item.(*limitedBroadcast)
// Special Broadcasts can only invalidate each other.
switch cur.b.(type) {
case NamedBroadcast:
// noop
case UniqueBroadcast:
// noop
if b.Invalidates(cur.b) {
remove = append(remove, cur)
return true
for _, cur := range remove {
// Append to the queue
q.bcQueue = append(q.bcQueue, &limitedBroadcast{0, b})
// Append to the relevant queue.
// deleteItem removes the given item from the overall datastructure. You
// must already hold the mutex.
func (q *TransmitLimitedQueue) deleteItem(cur *limitedBroadcast) {
_ = q.tq.Delete(cur)
if cur.name != "" {
delete(q.tm, cur.name)
if q.tq.Len() == 0 {
// At idle there's no reason to let the id generator keep going
// indefinitely.
q.idGen = 0
// addItem adds the given item into the overall datastructure. You must already
// hold the mutex.
func (q *TransmitLimitedQueue) addItem(cur *limitedBroadcast) {
_ = q.tq.ReplaceOrInsert(cur)
if cur.name != "" {
q.tm[cur.name] = cur
// getTransmitRange returns a pair of min/max values for transmit values
// represented by the current queue contents. Both values represent actual
// transmit values on the interval [0, len). You must already hold the mutex.
func (q *TransmitLimitedQueue) getTransmitRange() (minTransmit, maxTransmit int) {
if q.lenLocked() == 0 {
return 0, 0
minItem, maxItem := q.tq.Min(), q.tq.Max()
if minItem == nil || maxItem == nil {
return 0, 0
min := minItem.(*limitedBroadcast).transmits
max := maxItem.(*limitedBroadcast).transmits
return min, max
// GetBroadcasts is used to get a number of broadcasts, up to a byte limit
// and applying a per-message overhead as provided.
func (q *TransmitLimitedQueue) GetBroadcasts(overhead, limit int) [][]byte {
defer q.Unlock()
defer q.mu.Unlock()
// Fast path the default case
if len(q.bcQueue) == 0 {
if q.lenLocked() == 0 {
return nil
transmitLimit := retransmitLimit(q.RetransmitMult, q.NumNodes())
bytesUsed := 0
var toSend [][]byte
for i := len(q.bcQueue) - 1; i >= 0; i-- {
// Check if this is within our limits
b := q.bcQueue[i]
msg := b.b.Message()
if bytesUsed+overhead+len(msg) > limit {
var (
bytesUsed int
toSend [][]byte
reinsert []*limitedBroadcast
// Visit fresher items first, but only look at stuff that will fit.
// We'll go tier by tier, grabbing the largest items first.
minTr, maxTr := q.getTransmitRange()
for transmits := minTr; transmits <= maxTr; /*do not advance automatically*/ {
free := int64(limit - bytesUsed - overhead)
if free <= 0 {
break // bail out early
// Search for the least element on a given tier (by transmit count) as
// defined in the limitedBroadcast.Less function that will fit into our
// remaining space.
greaterOrEqual := &limitedBroadcast{
transmits: transmits,
msgLen: free,
id: math.MaxInt64,
lessThan := &limitedBroadcast{
transmits: transmits + 1,
msgLen: math.MaxInt64,
id: math.MaxInt64,
var keep *limitedBroadcast
q.tq.AscendRange(greaterOrEqual, lessThan, func(item btree.Item) bool {
cur := item.(*limitedBroadcast)
// Check if this is within our limits
if int64(len(cur.b.Message())) > free {
// If this happens it's a bug in the datastructure or
// surrounding use doing something like having len(Message())
// change over time. There's enough going on here that it's
// probably sane to just skip it and move on for now.
return true
keep = cur
return false
if keep == nil {
// No more items of an appropriate size in the tier.
msg := keep.b.Message()
// Add to slice to send
bytesUsed += overhead + len(msg)
toSend = append(toSend, msg)
// Check if we should stop transmission
if b.transmits >= transmitLimit {
n := len(q.bcQueue)
q.bcQueue[i], q.bcQueue[n-1] = q.bcQueue[n-1], nil
q.bcQueue = q.bcQueue[:n-1]
if keep.transmits+1 >= transmitLimit {
} else {
// We need to bump this item down to another transmit tier, but
// because it would be in the same direction that we're walking the
// tiers, we will have to delay the reinsertion until we are
// finished our search. Otherwise we'll possibly re-add the message
// when we ascend to the next tier.
reinsert = append(reinsert, keep)
// If we are sending anything, we need to re-sort to deal
// with adjusted transmit counts
if len(toSend) > 0 {
for _, cur := range reinsert {
return toSend
// NumQueued returns the number of queued messages
func (q *TransmitLimitedQueue) NumQueued() int {
defer q.Unlock()
return len(q.bcQueue)
defer q.mu.Unlock()
return q.lenLocked()
// Reset clears all the queued messages
func (q *TransmitLimitedQueue) Reset() {
defer q.Unlock()
for _, b := range q.bcQueue {
// lenLocked returns the length of the overall queue datastructure. You must
// hold the mutex.
func (q *TransmitLimitedQueue) lenLocked() int {
if q.tq == nil {
return 0
q.bcQueue = nil
return q.tq.Len()
// Reset clears all the queued messages. Should only be used for tests.
func (q *TransmitLimitedQueue) Reset() {
defer q.mu.Unlock()
q.walkReadOnlyLocked(false, func(cur *limitedBroadcast) bool {
return true
q.tq = nil
q.tm = nil
q.idGen = 0
// Prune will retain the maxRetain latest messages, and the rest
// will be discarded. This can be used to prevent unbounded queue sizes
func (q *TransmitLimitedQueue) Prune(maxRetain int) {
defer q.Unlock()
defer q.mu.Unlock()
// Do nothing if queue size is less than the limit
n := len(q.bcQueue)
if n < maxRetain {
for q.tq.Len() > maxRetain {
item := q.tq.Max()
if item == nil {
cur := item.(*limitedBroadcast)
// Invalidate the messages we will be removing
for i := 0; i < n-maxRetain; i++ {
// Move the messages, and retain only the last maxRetain
copy(q.bcQueue[0:], q.bcQueue[n-maxRetain:])
q.bcQueue = q.bcQueue[:maxRetain]
func (b limitedBroadcasts) Len() int {
return len(b)
func (b limitedBroadcasts) Less(i, j int) bool {
return b[i].transmits < b[j].transmits
func (b limitedBroadcasts) Swap(i, j int) {
b[i], b[j] = b[j], b[i]
func (b limitedBroadcasts) Sort() {

View File

@ -233,6 +233,15 @@ START:
// probeNodeByAddr just safely calls probeNode given only the address of the node (for tests)
func (m *Memberlist) probeNodeByAddr(addr string) {
n := m.nodeMap[addr]
// probeNode handles a single round of failure checking on a node.
func (m *Memberlist) probeNode(node *nodeState) {
defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())

View File

@ -78,10 +78,9 @@ func retransmitLimit(retransmitMult, n int) int {
// shuffleNodes randomly shuffles the input nodes using the Fisher-Yates shuffle
func shuffleNodes(nodes []*nodeState) {
n := len(nodes)
for i := n - 1; i > 0; i-- {
j := rand.Intn(i + 1)
rand.Shuffle(n, func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
// pushPushScale is used to scale the time interval at which push/pull

View File

@ -16,6 +16,9 @@ func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
return false
// implements memberlist.UniqueBroadcast
func (b *broadcast) UniqueBroadcast() {}
func (b *broadcast) Message() []byte {
return b.msg

vendor/vendor.json vendored
View File

@ -126,7 +126,7 @@
@ -146,12 +146,12 @@