Updates Serf to pick up small fixes and coordinate NaN/Inf defenses.

pull/3076/head
James Phillips 2017-05-25 16:16:37 -07:00
parent 7d2b473101
commit 9aba84eb1e
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
7 changed files with 153 additions and 73 deletions

View File

@ -34,10 +34,20 @@ type Client struct {
// value to determine how many samples we keep, per node.
latencyFilterSamples map[string][]float64
// stats is used to record events that occur when updating coordinates.
stats ClientStats
// mutex enables safe concurrent access to the client.
mutex sync.RWMutex
}
// ClientStats is used to record events that occur when updating coordinates.
type ClientStats struct {
// Resets is incremented any time we reset our local coordinate because
// our calculations have resulted in an invalid state.
Resets int
}
// NewClient creates a new Client and verifies the configuration is valid.
func NewClient(config *Config) (*Client, error) {
if !(config.Dimensionality > 0) {
@ -63,11 +73,16 @@ func (c *Client) GetCoordinate() *Coordinate {
}
// SetCoordinate forces the client's coordinate to a known state.
func (c *Client) SetCoordinate(coord *Coordinate) {
func (c *Client) SetCoordinate(coord *Coordinate) error {
c.mutex.Lock()
defer c.mutex.Unlock()
if err := c.checkCoordinate(coord); err != nil {
return err
}
c.coord = coord.Clone()
return nil
}
// ForgetNode removes any client state for the given node.
@ -78,6 +93,29 @@ func (c *Client) ForgetNode(node string) {
delete(c.latencyFilterSamples, node)
}
// Stats returns a copy of stats for the client.
func (c *Client) Stats() ClientStats {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.stats
}
// checkCoordinate returns an error if the coordinate isn't compatible with
// this client, or if the coordinate itself isn't valid. This assumes the mutex
// has been locked already.
func (c *Client) checkCoordinate(coord *Coordinate) error {
if !c.coord.IsCompatibleWith(coord) {
return fmt.Errorf("dimensions aren't compatible")
}
if !coord.IsValid() {
return fmt.Errorf("coordinate is invalid")
}
return nil
}
// latencyFilter applies a simple moving median filter with a new sample for
// a node. This assumes that the mutex has been locked already.
func (c *Client) latencyFilter(node string, rttSeconds float64) float64 {
@ -159,15 +197,24 @@ func (c *Client) updateGravity() {
// Update takes other, a coordinate for another node, and rtt, a round trip
// time observation for a ping to that node, and updates the estimated position of
// the client's coordinate. Returns the updated coordinate.
func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) *Coordinate {
func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) (*Coordinate, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
if err := c.checkCoordinate(other); err != nil {
return nil, err
}
rttSeconds := c.latencyFilter(node, rtt.Seconds())
c.updateVivaldi(other, rttSeconds)
c.updateAdjustment(other, rttSeconds)
c.updateGravity()
return c.coord.Clone()
if !c.coord.IsValid() {
c.stats.Resets++
c.coord = NewCoordinate(c.config)
}
return c.coord.Clone(), nil
}
// DistanceTo returns the estimated RTT from the client's coordinate to other, the

View File

@ -72,6 +72,26 @@ func (c *Coordinate) Clone() *Coordinate {
}
}
// componentIsValid returns false if a floating point value is a NaN or an
// infinity.
func componentIsValid(f float64) bool {
return !math.IsInf(f, 0) && !math.IsNaN(f)
}
// IsValid returns false if any component of a coordinate isn't valid, per the
// componentIsValid() helper above.
func (c *Coordinate) IsValid() bool {
for i := range c.Vec {
if !componentIsValid(c.Vec[i]) {
return false
}
}
return componentIsValid(c.Error) &&
componentIsValid(c.Adjustment) &&
componentIsValid(c.Height)
}
// IsCompatibleWith checks to see if the two coordinates are compatible
// dimensionally. If this returns true then you are guaranteed to not get
// any runtime errors operating on them.
@ -122,7 +142,7 @@ func (c *Coordinate) rawDistanceTo(other *Coordinate) float64 {
// already been checked to be compatible.
func add(vec1 []float64, vec2 []float64) []float64 {
ret := make([]float64, len(vec1))
for i, _ := range ret {
for i := range ret {
ret[i] = vec1[i] + vec2[i]
}
return ret
@ -132,7 +152,7 @@ func add(vec1 []float64, vec2 []float64) []float64 {
// dimensions have already been checked to be compatible.
func diff(vec1 []float64, vec2 []float64) []float64 {
ret := make([]float64, len(vec1))
for i, _ := range ret {
for i := range ret {
ret[i] = vec1[i] - vec2[i]
}
return ret
@ -141,7 +161,7 @@ func diff(vec1 []float64, vec2 []float64) []float64 {
// mul returns vec multiplied by a scalar factor.
func mul(vec []float64, factor float64) []float64 {
ret := make([]float64, len(vec))
for i, _ := range vec {
for i := range vec {
ret[i] = vec[i] * factor
}
return ret
@ -150,7 +170,7 @@ func mul(vec []float64, factor float64) []float64 {
// magnitude computes the magnitude of the vec.
func magnitude(vec []float64) float64 {
sum := 0.0
for i, _ := range vec {
for i := range vec {
sum += vec[i] * vec[i]
}
return math.Sqrt(sum)
@ -168,7 +188,7 @@ func unitVectorAt(vec1 []float64, vec2 []float64) ([]float64, float64) {
}
// Otherwise, just return a random unit vector.
for i, _ := range ret {
for i := range ret {
ret[i] = rand.Float64() - 0.5
}
if mag := magnitude(ret); mag > zeroThreshold {

View File

@ -62,28 +62,29 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat
var coord coordinate.Coordinate
if err := dec.Decode(&coord); err != nil {
log.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err)
return
}
// Apply the update. Since this is a coordinate coming from some place
// else we harden this and look for dimensionality problems proactively.
// Apply the update.
before := p.serf.coordClient.GetCoordinate()
if before.IsCompatibleWith(&coord) {
after := p.serf.coordClient.Update(other.Name, &coord, rtt)
// Publish some metrics to give us an idea of how much we are
// adjusting each time we update.
d := float32(before.DistanceTo(after).Seconds() * 1.0e3)
metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d)
// Cache the coordinate for the other node, and add our own
// to the cache as well since it just got updated. This lets
// users call GetCachedCoordinate with our node name, which is
// more friendly.
p.serf.coordCacheLock.Lock()
p.serf.coordCache[other.Name] = &coord
p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate()
p.serf.coordCacheLock.Unlock()
} else {
log.Printf("[ERR] serf: Rejected bad coordinate: %v\n", coord)
after, err := p.serf.coordClient.Update(other.Name, &coord, rtt)
if err != nil {
log.Printf("[ERR] serf: Rejected coordinate from %s: %v\n",
other.Name, err)
return
}
// Publish some metrics to give us an idea of how much we are
// adjusting each time we update.
d := float32(before.DistanceTo(after).Seconds() * 1.0e3)
metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d)
// Cache the coordinate for the other node, and add our own
// to the cache as well since it just got updated. This lets
// users call GetCachedCoordinate with our node name, which is
// more friendly.
p.serf.coordCacheLock.Lock()
p.serf.coordCache[other.Name] = &coord
p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate()
p.serf.coordCacheLock.Unlock()
}

View File

@ -1,6 +1,7 @@
package serf
import (
"errors"
"fmt"
"math"
"math/rand"
@ -148,6 +149,8 @@ func (r *QueryResponse) Deadline() time.Time {
// Finished returns if the query is finished running
func (r *QueryResponse) Finished() bool {
r.closeLock.Lock()
defer r.closeLock.Unlock()
return r.closed || time.Now().After(r.deadline)
}
@ -164,6 +167,22 @@ func (r *QueryResponse) ResponseCh() <-chan NodeResponse {
return r.respCh
}
// sendResponse sends a response on the response channel ensuring the channel is not closed.
func (r *QueryResponse) sendResponse(nr NodeResponse) error {
r.closeLock.Lock()
defer r.closeLock.Unlock()
if r.closed {
return nil
}
select {
case r.respCh <- nr:
r.responses[nr.From] = struct{}{}
default:
return errors.New("serf: Failed to deliver query response, dropping")
}
return nil
}
// NodeResponse is used to represent a single response from a node
type NodeResponse struct {
From string

View File

@ -241,18 +241,13 @@ func Create(conf *Config) (*Serf, error) {
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
}
if conf.LogOutput != nil && conf.Logger != nil {
return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.")
}
logDest := conf.LogOutput
if logDest == nil {
logDest = os.Stderr
}
logger := conf.Logger
if logger == nil {
logger = log.New(logDest, "", log.LstdFlags)
logOutput := conf.LogOutput
if logOutput == nil {
logOutput = os.Stderr
}
logger = log.New(logOutput, "", log.LstdFlags)
}
serf := &Serf{
@ -343,21 +338,15 @@ func Create(conf *Config) (*Serf, error) {
// Setup the various broadcast queues, which we use to send our own
// custom broadcasts along the gossip channel.
serf.broadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(serf.members)
},
NumNodes: serf.NumNodes,
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
}
serf.eventBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(serf.members)
},
NumNodes: serf.NumNodes,
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
}
serf.queryBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(serf.members)
},
NumNodes: serf.NumNodes,
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
}
@ -807,13 +796,15 @@ func (s *Serf) Shutdown() error {
s.logger.Printf("[WARN] serf: Shutdown without a Leave")
}
// Wait to close the shutdown channel until after we've shut down the
// memberlist and its associated network resources, since the shutdown
// channel signals that we are cleaned up outside of Serf.
s.state = SerfShutdown
close(s.shutdownCh)
err := s.memberlist.Shutdown()
if err != nil {
return err
}
close(s.shutdownCh)
// Wait for the snapshoter to finish if we have one
if s.snapshotter != nil {
@ -1323,11 +1314,9 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
}
metrics.IncrCounter([]string{"serf", "query_responses"}, 1)
select {
case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}:
query.responses[resp.From] = struct{}{}
default:
s.logger.Printf("[WARN] serf: Failed to deliver query response, dropping")
err := query.sendResponse(NodeResponse{From: resp.From, Payload: resp.Payload})
if err != nil {
s.logger.Printf("[WARN] %v", err)
}
}
}
@ -1387,7 +1376,7 @@ func (s *Serf) resolveNodeConflict() {
// Update the counters
responses++
if bytes.Equal(member.Addr, local.Addr) && member.Port == local.Port {
if member.Addr.Equal(local.Addr) && member.Port == local.Port {
matching++
}
}
@ -1664,17 +1653,18 @@ func (s *Serf) Stats() map[string]string {
return strconv.FormatUint(v, 10)
}
stats := map[string]string{
"members": toString(uint64(len(s.members))),
"failed": toString(uint64(len(s.failedMembers))),
"left": toString(uint64(len(s.leftMembers))),
"health_score": toString(uint64(s.memberlist.GetHealthScore())),
"member_time": toString(uint64(s.clock.Time())),
"event_time": toString(uint64(s.eventClock.Time())),
"query_time": toString(uint64(s.queryClock.Time())),
"intent_queue": toString(uint64(s.broadcasts.NumQueued())),
"event_queue": toString(uint64(s.eventBroadcasts.NumQueued())),
"query_queue": toString(uint64(s.queryBroadcasts.NumQueued())),
"encrypted": fmt.Sprintf("%v", s.EncryptionEnabled()),
"members": toString(uint64(len(s.members))),
"failed": toString(uint64(len(s.failedMembers))),
"left": toString(uint64(len(s.leftMembers))),
"health_score": toString(uint64(s.memberlist.GetHealthScore())),
"member_time": toString(uint64(s.clock.Time())),
"event_time": toString(uint64(s.eventClock.Time())),
"query_time": toString(uint64(s.queryClock.Time())),
"intent_queue": toString(uint64(s.broadcasts.NumQueued())),
"event_queue": toString(uint64(s.eventBroadcasts.NumQueued())),
"query_queue": toString(uint64(s.queryBroadcasts.NumQueued())),
"encrypted": fmt.Sprintf("%v", s.EncryptionEnabled()),
"coordinate_resets": toString(uint64(s.coordClient.Stats().Resets)),
}
return stats
}

View File

@ -532,7 +532,10 @@ func (s *Snapshotter) replay() error {
s.logger.Printf("[WARN] serf: Failed to decode coordinate: %v", err)
continue
}
s.coordClient.SetCoordinate(&coord)
if err := s.coordClient.SetCoordinate(&coord); err != nil {
s.logger.Printf("[WARN] serf: Failed to set coordinate: %v", err)
continue
}
} else if line == "leave" {
// Ignore a leave if we plan on re-joining
if s.rejoinAfterLeave {

12
vendor/vendor.json vendored
View File

@ -668,18 +668,18 @@
"revisionTime": "2015-02-01T20:08:39Z"
},
{
"checksumSHA1": "E3Xcanc9ouQwL+CZGOUyA/+giLg=",
"checksumSHA1": "/oss17GO4hXGM7QnUdI3VzcAHzA=",
"comment": "v0.7.0-66-g6c4672d",
"path": "github.com/hashicorp/serf/coordinate",
"revision": "114430d8210835d66defdc31cdc176c58e060005",
"revisionTime": "2016-08-09T01:42:04Z"
"revision": "c2e4be24cdc9031eb0ad869c5d160775efdf7d7a",
"revisionTime": "2017-05-25T23:15:04Z"
},
{
"checksumSHA1": "AZ4RoXStVz6qx38ZMZAyC6Gw3Q4=",
"checksumSHA1": "cOk2eJmnkqSyA0utcLlzWMFDwXg=",
"comment": "v0.7.0-66-g6c4672d",
"path": "github.com/hashicorp/serf/serf",
"revision": "c5e26c3704ca774760df65ee8cbb039d9d9ec560",
"revisionTime": "2017-02-08T21:49:39Z"
"revision": "c2e4be24cdc9031eb0ad869c5d160775efdf7d7a",
"revisionTime": "2017-05-25T23:15:04Z"
},
{
"checksumSHA1": "ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=",