diff --git a/consul/config.go b/consul/config.go index fd8c14489d..9ded83e52f 100644 --- a/consul/config.go +++ b/consul/config.go @@ -205,6 +205,15 @@ type Config struct { // EnableCoordinates enables features related to network coordinates. EnableCoordinates bool + + // CoordinateUpdatePeriod controls how long a server batches coordinate updates + // before applying them in a Raft transaction. A larger period leads to fewer + // Raft transactions, but also the stored coordinates being more stale. + CoordinateUpdatePeriod time.Duration + + // CoordinateUpdateMaxBatchSize controls the maximum number of updates a + // server batches before applying them in a Raft transaction + CoordinateUpdateMaxBatchSize int } // CheckVersion is used to check if the ProtocolVersion is valid @@ -260,6 +269,8 @@ func DefaultConfig() *Config { TombstoneTTLGranularity: 30 * time.Second, SessionTTLMin: 10 * time.Second, EnableCoordinates: true, + CoordinateUpdatePeriod: time.Duration(30) * time.Second, + CoordinateUpdateMaxBatchSize: 1000, } // Increase our reap interval to 3 days instead of 24h. diff --git a/consul/coordinate_endpoint.go b/consul/coordinate_endpoint.go index 258d366dd6..8769c13876 100644 --- a/consul/coordinate_endpoint.go +++ b/consul/coordinate_endpoint.go @@ -1,6 +1,7 @@ package consul import ( + "sync" "time" "github.com/hashicorp/consul/consul/structs" @@ -8,22 +9,10 @@ import ( ) type Coordinate struct { - srv *Server -} - -var ( - // We batch updates and send them together every 30 seconds, or every 1000 updates, - // whichever comes sooner - updatePeriod = time.Duration(30) * time.Second - updateBatchMaxSize = 1000 - - updateBuffer []*structs.CoordinateUpdateRequest - updateLastSent time.Time -) - -func init() { - updateBuffer = nil - updateLastSent = time.Now() + srv *Server + updateLastSent time.Time + updateBuffer []*structs.CoordinateUpdateRequest + updateBufferLock sync.Mutex } // Get returns the the LAN coordinate of a node. @@ -39,7 +28,11 @@ func (c *Coordinate) GetLAN(args *structs.NodeSpecificRequest, reply *structs.In func() error { idx, coord, err := state.CoordinateGet(args.Node) reply.Index = idx - reply.Coord = coord.Coord + if coord == nil { + reply.Coord = nil + } else { + reply.Coord = coord.Coord + } return err }) } @@ -66,18 +59,25 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct return err } - updateBuffer = append(updateBuffer, args) - if time.Since(updateLastSent) > updatePeriod || len(updateBuffer) > updateBatchMaxSize { - _, err := c.srv.raftApply(structs.CoordinateRequestType, updateBuffer) - // We clear the buffer regardless of whether the raft transaction succeeded, just so the - // buffer doesn't keep growing without bound. - updateBuffer = nil - updateLastSent = time.Now() + c.updateBufferLock.Lock() + c.updateBuffer = append(c.updateBuffer, args) + if time.Since(c.updateLastSent) > c.srv.config.CoordinateUpdatePeriod || len(c.updateBuffer) > c.srv.config.CoordinateUpdateMaxBatchSize { + c.srv.logger.Printf("sending update for %v", args.Node) + // Apply the potentially time-consuming transaction out of band + go func() { + defer c.updateBufferLock.Unlock() + _, err := c.srv.raftApply(structs.CoordinateRequestType, c.updateBuffer) + // We clear the buffer regardless of whether the raft transaction succeeded, just so the + // buffer doesn't keep growing without bound. + c.updateBuffer = nil + c.updateLastSent = time.Now() - if err != nil { - c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err) - return err - } + if err != nil { + c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err) + } + }() + } else { + c.updateBufferLock.Unlock() } return nil diff --git a/consul/coordinate_endpoint_test.go b/consul/coordinate_endpoint_test.go index d10c8ed6d5..4c2f22b6fd 100644 --- a/consul/coordinate_endpoint_test.go +++ b/consul/coordinate_endpoint_test.go @@ -1,6 +1,7 @@ package consul import ( + "fmt" "math/rand" "os" "reflect" @@ -12,11 +13,6 @@ import ( "github.com/hashicorp/serf/coordinate" ) -func init() { - // Shorten updatePeriod so we don't have to wait as long - updatePeriod = time.Duration(100) * time.Millisecond -} - // getRandomCoordinate generates a random coordinate. func getRandomCoordinate() *coordinate.Coordinate { config := coordinate.DefaultConfig() @@ -43,9 +39,16 @@ func coordinatesEqual(a, b *coordinate.Coordinate) bool { } func TestCoordinateUpdate(t *testing.T) { - dir1, s1 := testServer(t) + name := fmt.Sprintf("Node %d", getPort()) + dir1, config1 := testServerConfig(t, name) + config1.CoordinateUpdatePeriod = 1000 * time.Millisecond + s1, err := NewServer(config1) + if err != nil { + t.Fatal(err) + } defer os.RemoveAll(dir1) defer s1.Shutdown() + client := rpcClient(t, s1) defer client.Close() @@ -65,8 +68,6 @@ func TestCoordinateUpdate(t *testing.T) { Coord: getRandomCoordinate(), } - updateLastSent = time.Now() - var out struct{} if err := client.Call("Coordinate.Update", &arg1, &out); err != nil { t.Fatalf("err: %v", err) @@ -83,10 +84,12 @@ func TestCoordinateUpdate(t *testing.T) { } // Wait a while and send another update; this time the updates should be sent - time.Sleep(time.Duration(2) * updatePeriod) + time.Sleep(2 * s1.config.CoordinateUpdatePeriod) if err := client.Call("Coordinate.Update", &arg2, &out); err != nil { t.Fatalf("err: %v", err) } + // Yield the current goroutine to allow the goroutine that sends the updates to run + time.Sleep(100 * time.Millisecond) _, d, err = state.CoordinateGet("node1") if err != nil { @@ -112,8 +115,6 @@ func TestCoordinateUpdate(t *testing.T) { } func TestCoordinateGetLAN(t *testing.T) { - updatePeriod = time.Duration(0) // to make updates instant - dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -133,9 +134,11 @@ func TestCoordinateGetLAN(t *testing.T) { if err := client.Call("Coordinate.Update", &arg, &out); err != nil { t.Fatalf("err: %v", err) } + // Yield the current goroutine to allow the goroutine that sends the updates to run + time.Sleep(100 * time.Millisecond) // Get via RPC - var out2 *structs.IndexedCoordinate + out2 := structs.IndexedCoordinate{} arg2 := structs.NodeSpecificRequest{ Datacenter: "dc1", Node: "node1", @@ -153,6 +156,9 @@ func TestCoordinateGetLAN(t *testing.T) { if err := client.Call("Coordinate.Update", &arg, &out); err != nil { t.Fatalf("err: %v", err) } + // Yield the current goroutine to allow the goroutine that sends the updates to run + time.Sleep(100 * time.Millisecond) + if err := client.Call("Coordinate.GetLAN", &arg2, &out2); err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/server.go b/consul/server.go index eaa3b9bcf8..ef82c70c06 100644 --- a/consul/server.go +++ b/consul/server.go @@ -407,7 +407,10 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { s.endpoints.Internal = &Internal{s} s.endpoints.ACL = &ACL{s} if s.config.EnableCoordinates { - s.endpoints.Coordinate = &Coordinate{s} + s.endpoints.Coordinate = &Coordinate{ + srv: s, + updateLastSent: time.Now(), + } } // Register the handlers diff --git a/consul/server_test.go b/consul/server_test.go index 380edc4733..56ce691c1c 100644 --- a/consul/server_test.go +++ b/consul/server_test.go @@ -66,6 +66,9 @@ func testServerConfig(t *testing.T, NodeName string) (string, *Config) { config.RaftConfig.ElectionTimeout = 40 * time.Millisecond config.ReconcileInterval = 100 * time.Millisecond + + config.EnableCoordinates = true + config.CoordinateUpdatePeriod = 0 // make updates instant return dir, config }