Does some small cleanups based on PR feedback.

* Holds coordinate updates in map and gets rid of the update channel.
* Cleans up config variables a bit.
pull/1331/head
James Phillips 2015-06-29 15:53:29 -07:00
parent 7e6d52109b
commit 5f754c4a87
7 changed files with 126 additions and 119 deletions

View File

@ -572,21 +572,27 @@ func (a *Agent) sendCoordinate() {
select {
case <-time.After(intv):
var c *coordinate.Coordinate
var err error
if a.config.Server {
c = a.server.GetLANCoordinate()
c, err = a.server.GetLANCoordinate()
} else {
c = a.client.GetCoordinate()
c, err = a.client.GetCoordinate()
}
if err != nil {
a.logger.Printf("[ERR] agent: failed to get coordinate: %s", err)
continue
}
req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
Coord: c,
WriteRequest: structs.WriteRequest{Token: a.config.ACLToken},
}
var reply struct{}
if err := a.RPC("Coordinate.Update", &req, &reply); err != nil {
a.logger.Printf("[ERR] agent: coordinate update error: %s", err)
continue
}
case <-a.shutdownCh:
return

View File

@ -9,7 +9,6 @@ import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/serf/coordinate"
)
func TestAgentAntiEntropy_Services(t *testing.T) {
@ -809,7 +808,8 @@ func TestAgent_sendCoordinate(t *testing.T) {
conf := nextConfig()
conf.SyncCoordinateInterval = 10 * time.Millisecond
conf.ConsulConfig.CoordinateUpdatePeriod = 100 * time.Millisecond
conf.ConsulConfig.CoordinateUpdateMaxBatchSize = 20
conf.ConsulConfig.CoordinateUpdateBatchSize = 15
conf.ConsulConfig.CoordinateUpdateMaxBatches = 1
dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)
defer agent.Shutdown()
@ -831,47 +831,4 @@ func TestAgent_sendCoordinate(t *testing.T) {
if reply.Coord == nil {
t.Fatalf("should get a coordinate")
}
// Start spamming for a little while to get rate limit errors back from
// the server.
conf.SyncCoordinateInterval = 1 * time.Millisecond
time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod)
// Slow down and let the server catch up.
conf.SyncCoordinateInterval = 10 * time.Millisecond
time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod)
// Inject a sentinel coordinate so we can confirm that the periodic process
// is still able to update it.
sentinel := coordinate.NewCoordinate(coordinate.DefaultConfig())
sentinel.Vec[0] = 23.0
func() {
req := structs.CoordinateUpdateRequest{
Datacenter: agent.config.Datacenter,
Node: agent.config.NodeName,
Coord: sentinel,
WriteRequest: structs.WriteRequest{Token: agent.config.ACLToken},
}
var reply struct{}
if err := agent.RPC("Coordinate.Update", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
}()
// Wait a little while for the injected update, as well as periodic ones
// to fire.
time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod)
// Make sure the injected coordinate is not the one that's present since
// there should have been some more periodic updates.
req = structs.NodeSpecificRequest{
Datacenter: agent.config.Datacenter,
Node: agent.config.NodeName,
}
if err := agent.RPC("Coordinate.Get", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if reflect.DeepEqual(sentinel, reply.Coord) {
t.Fatalf("should not have gotten the sentinel coordinate")
}
}

View File

@ -382,6 +382,6 @@ func (c *Client) Stats() map[string]map[string]string {
// GetCoordinate returns the network coordinate of the current node, as
// maintained by Serf.
func (c *Client) GetCoordinate() *coordinate.Coordinate {
func (c *Client) GetCoordinate() (*coordinate.Coordinate, error) {
return c.serf.GetCoordinate()
}

View File

@ -212,9 +212,14 @@ type Config struct {
// being more stale.
CoordinateUpdatePeriod time.Duration
// CoordinateUpdateMaxBatchSize controls the maximum number of updates a
// CoordinateUpdateBatchSize controls the maximum number of updates a
// server batches before applying them in a Raft transaction.
CoordinateUpdateMaxBatchSize int
CoordinateUpdateBatchSize int
// CoordinateUpdateMaxBatches controls the maximum number of batches we
// are willing to apply in one period. After this limit we will issue a
// warning and discard the remaining updates.
CoordinateUpdateMaxBatches int
}
// CheckVersion is used to check if the ProtocolVersion is valid
@ -274,10 +279,11 @@ func DefaultConfig() *Config {
// SyncCoordinateInterval defaults to 20 seconds, and scales up
// as the number of nodes in the cluster goes up. For 100k nodes,
// it will move up to 201 seconds, which gives an update rate of
// just under 500 updates per second. We will split this into 2
// batches.
CoordinateUpdatePeriod: 500 * time.Millisecond,
CoordinateUpdateMaxBatchSize: 250,
// just under 500 updates per second. With this tuning we will
// apply less than 5 batches per period.
CoordinateUpdatePeriod: 5 * time.Second,
CoordinateUpdateBatchSize: 512,
CoordinateUpdateMaxBatches: 5,
}
// Increase our reap interval to 3 days instead of 24h.

View File

@ -2,9 +2,11 @@ package consul
import (
"fmt"
"sync"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/coordinate"
)
// Coordinate manages queries and updates for network coordinates.
@ -12,54 +14,90 @@ type Coordinate struct {
// srv is a pointer back to the server.
srv *Server
// updateCh receives coordinate updates and applies them to the raft log
// in batches so that we don't create tons of tiny transactions.
updateCh chan *structs.Coordinate
// updates holds pending coordinate updates for the given nodes.
updates map[string]*coordinate.Coordinate
// updatesLock synchronizes access to the updates map.
updatesLock sync.Mutex
}
// NewCoordinate returns a new Coordinate endpoint.
func NewCoordinate(srv *Server) *Coordinate {
len := srv.config.CoordinateUpdateMaxBatchSize
c := &Coordinate{
srv: srv,
updateCh: make(chan *structs.Coordinate, len),
updates: make(map[string]*coordinate.Coordinate),
}
// This will flush all pending updates at a fixed period.
go func() {
for {
select {
case <-time.After(srv.config.CoordinateUpdatePeriod):
if err := c.batchApplyUpdates(); err != nil {
c.srv.logger.Printf("[ERR] consul.coordinate: Batch update failed: %v", err)
}
case <-srv.shutdownCh:
return
}
}
}()
go c.batchUpdate()
return c
}
// batchApplyUpdates is a non-blocking routine that applies all pending updates
// to the Raft log.
func (c *Coordinate) batchApplyUpdates() error {
var updates []*structs.Coordinate
for done := false; !done; {
// batchUpdate is a long-running routine that flushes pending coordinates to the
// Raft log in batches.
func (c *Coordinate) batchUpdate() {
for {
select {
case update := <-c.updateCh:
updates = append(updates, update)
default:
done = true
case <-time.After(c.srv.config.CoordinateUpdatePeriod):
if err := c.batchApplyUpdates(); err != nil {
c.srv.logger.Printf("[ERR] consul.coordinate: Batch update failed: %v", err)
}
case <-c.srv.shutdownCh:
return
}
}
}
// batchApplyUpdates applies all pending updates to the Raft log in a series of batches.
func (c *Coordinate) batchApplyUpdates() error {
c.updatesLock.Lock()
defer c.updatesLock.Unlock()
// No matter what happens in here we should clear out any unprocessed
// updates
defer func() {
if len(c.updates) > 0 {
c.srv.logger.Printf("[ERR] Discarded %d coordinate updates; increase SyncCoordinateInterval", len(c.updates))
c.updates = make(map[string]*coordinate.Coordinate)
}
}()
batch := make([]*structs.Coordinate, 0, c.srv.config.CoordinateUpdateBatchSize)
flushBatch := func() error {
if len(batch) == 0 {
return nil
}
if _, err := c.srv.raftApply(structs.CoordinateBatchUpdateType, batch); err != nil {
return err
}
batch = batch[:0]
return nil
}
// Process up to the max configured number of updates.
remaining := c.srv.config.CoordinateUpdateBatchSize * c.srv.config.CoordinateUpdateMaxBatches
for node, coord := range(c.updates) {
if remaining <= 0 {
break
}
batch = append(batch, &structs.Coordinate{node, coord})
delete(c.updates, node)
remaining--
if len(batch) == c.srv.config.CoordinateUpdateBatchSize {
if err := flushBatch(); err != nil {
return err
}
}
}
if len(updates) > 0 {
if _, err := c.srv.raftApply(structs.CoordinateBatchUpdateType, updates); err != nil {
return err
}
// Flush any leftovers from a partial batch.
if err := flushBatch(); err != nil {
return err
}
return nil
}
@ -71,20 +109,18 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
// Since this is a coordinate coming from some place else we harden this
// and look for dimensionality problems proactively.
if !c.srv.serfLAN.GetCoordinate().IsCompatibleWith(args.Coord) {
coord, err := c.srv.serfLAN.GetCoordinate()
if err != nil {
return err
}
if !coord.IsCompatibleWith(args.Coord) {
return fmt.Errorf("rejected bad coordinate: %v", args.Coord)
}
// Perform a non-blocking write to the channel. We'd rather spill updates
// than gum things up blocking here.
update := &structs.Coordinate{Node: args.Node, Coord: args.Coord}
select {
case c.updateCh <- update:
// This is a noop - we are done if the write went through.
default:
return fmt.Errorf("coordinate update rate limit exceeded, increase SyncCoordinateInterval")
}
// Add the coordinate to the map of pending updates.
c.updatesLock.Lock()
c.updates[args.Node] = args.Coord
c.updatesLock.Unlock()
return nil
}

View File

@ -44,7 +44,8 @@ func TestCoordinate_Update(t *testing.T) {
defer os.RemoveAll(dir1)
config1.CoordinateUpdatePeriod = 500 * time.Millisecond
config1.CoordinateUpdateMaxBatchSize = 5
config1.CoordinateUpdateBatchSize = 5
config1.CoordinateUpdateMaxBatches = 2
s1, err := NewServer(config1)
if err != nil {
t.Fatal(err)
@ -113,33 +114,34 @@ func TestCoordinate_Update(t *testing.T) {
}
verifyCoordinatesEqual(t, c, arg2.Coord)
// Now try spamming coordinates and make sure it starts dropping when
// the pipe is full.
for i := 0; i < s1.config.CoordinateUpdateMaxBatchSize; i++ {
// Now spam some coordinate updates and make sure it starts throwing
// them away if they exceed the batch allowance. Node we have to make
// unique names since these are held in map by node name.
spamLen := s1.config.CoordinateUpdateBatchSize * s1.config.CoordinateUpdateMaxBatches + 1
for i := 0; i < spamLen; i++ {
arg1.Node = fmt.Sprintf("bogusnode%d", i)
arg1.Coord = generateRandomCoordinate()
if err := client.Call("Coordinate.Update", &arg1, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
// This one should get dropped.
arg2.Coord = generateRandomCoordinate()
err = client.Call("Coordinate.Update", &arg2, &out)
if err == nil || !strings.Contains(err.Error(), "rate limit") {
t.Fatalf("should have failed with a rate limit error, got %v", err)
}
// Wait a little while for the batch routine to run, then make sure
// all but the last coordinate update made it in.
// exactly one of the updates got dropped (we won't know which one).
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
_, c, err = state.CoordinateGet("node1")
if err != nil {
t.Fatalf("err: %v", err)
numDropped := 0
for i := 0; i < spamLen; i++ {
_, c, err = state.CoordinateGet(fmt.Sprintf("bogusnode%d", i))
if err != nil {
t.Fatalf("err: %v", err)
}
if c == nil {
numDropped++
}
}
if c == nil {
t.Fatalf("should return a coordinate but it's nil")
if numDropped != 1 {
t.Fatalf("wrong number of coordinates dropped, %d != 1", numDropped)
}
verifyCoordinatesEqual(t, c, arg1.Coord)
// Finally, send a coordinate with the wrong dimensionality to make sure
// there are no panics, and that it gets rejected.

View File

@ -313,7 +313,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
// number of nodes should be small, and where we serve these directly
// from Serf because they aren't managed in the catalog.
conf.DisableCoordinates = s.config.DisableCoordinates
conf.CacheCoordinates = (!s.config.DisableCoordinates) && wan
conf.CacheCoordinates = wan
return serf.Create(conf)
}
@ -703,11 +703,11 @@ func (s *Server) Stats() map[string]map[string]string {
}
// GetLANCoordinate returns the coordinate of the server in the LAN gossip pool.
func (s *Server) GetLANCoordinate() *coordinate.Coordinate {
func (s *Server) GetLANCoordinate() (*coordinate.Coordinate, error) {
return s.serfLAN.GetCoordinate()
}
// GetWANCoordinate returns the coordinate of the server in the WAN gossip pool.
func (s *Server) GetWANCoordinate() *coordinate.Coordinate {
func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) {
return s.serfWAN.GetCoordinate()
}