Browse Source

Fix tests

pull/1331/head
Derek Chiang 10 years ago committed by James Phillips
parent
commit
69003310ca
  1. 11
      consul/config.go
  2. 56
      consul/coordinate_endpoint.go
  3. 30
      consul/coordinate_endpoint_test.go
  4. 5
      consul/server.go
  5. 3
      consul/server_test.go

11
consul/config.go

@ -205,6 +205,15 @@ type Config struct {
// EnableCoordinates enables features related to network coordinates.
EnableCoordinates bool
// CoordinateUpdatePeriod controls how long a server batches coordinate updates
// before applying them in a Raft transaction. A larger period leads to fewer
// Raft transactions, but also the stored coordinates being more stale.
CoordinateUpdatePeriod time.Duration
// CoordinateUpdateMaxBatchSize controls the maximum number of updates a
// server batches before applying them in a Raft transaction
CoordinateUpdateMaxBatchSize int
}
// CheckVersion is used to check if the ProtocolVersion is valid
@ -260,6 +269,8 @@ func DefaultConfig() *Config {
TombstoneTTLGranularity: 30 * time.Second,
SessionTTLMin: 10 * time.Second,
EnableCoordinates: true,
CoordinateUpdatePeriod: time.Duration(30) * time.Second,
CoordinateUpdateMaxBatchSize: 1000,
}
// Increase our reap interval to 3 days instead of 24h.

56
consul/coordinate_endpoint.go

@ -1,6 +1,7 @@
package consul
import (
"sync"
"time"
"github.com/hashicorp/consul/consul/structs"
@ -8,22 +9,10 @@ import (
)
type Coordinate struct {
srv *Server
}
var (
// We batch updates and send them together every 30 seconds, or every 1000 updates,
// whichever comes sooner
updatePeriod = time.Duration(30) * time.Second
updateBatchMaxSize = 1000
updateBuffer []*structs.CoordinateUpdateRequest
updateLastSent time.Time
)
func init() {
updateBuffer = nil
updateLastSent = time.Now()
srv *Server
updateLastSent time.Time
updateBuffer []*structs.CoordinateUpdateRequest
updateBufferLock sync.Mutex
}
// Get returns the the LAN coordinate of a node.
@ -39,7 +28,11 @@ func (c *Coordinate) GetLAN(args *structs.NodeSpecificRequest, reply *structs.In
func() error {
idx, coord, err := state.CoordinateGet(args.Node)
reply.Index = idx
reply.Coord = coord.Coord
if coord == nil {
reply.Coord = nil
} else {
reply.Coord = coord.Coord
}
return err
})
}
@ -66,18 +59,25 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
return err
}
updateBuffer = append(updateBuffer, args)
if time.Since(updateLastSent) > updatePeriod || len(updateBuffer) > updateBatchMaxSize {
_, err := c.srv.raftApply(structs.CoordinateRequestType, updateBuffer)
// We clear the buffer regardless of whether the raft transaction succeeded, just so the
// buffer doesn't keep growing without bound.
updateBuffer = nil
updateLastSent = time.Now()
c.updateBufferLock.Lock()
c.updateBuffer = append(c.updateBuffer, args)
if time.Since(c.updateLastSent) > c.srv.config.CoordinateUpdatePeriod || len(c.updateBuffer) > c.srv.config.CoordinateUpdateMaxBatchSize {
c.srv.logger.Printf("sending update for %v", args.Node)
// Apply the potentially time-consuming transaction out of band
go func() {
defer c.updateBufferLock.Unlock()
_, err := c.srv.raftApply(structs.CoordinateRequestType, c.updateBuffer)
// We clear the buffer regardless of whether the raft transaction succeeded, just so the
// buffer doesn't keep growing without bound.
c.updateBuffer = nil
c.updateLastSent = time.Now()
if err != nil {
c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err)
return err
}
if err != nil {
c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err)
}
}()
} else {
c.updateBufferLock.Unlock()
}
return nil

30
consul/coordinate_endpoint_test.go

@ -1,6 +1,7 @@
package consul
import (
"fmt"
"math/rand"
"os"
"reflect"
@ -12,11 +13,6 @@ import (
"github.com/hashicorp/serf/coordinate"
)
func init() {
// Shorten updatePeriod so we don't have to wait as long
updatePeriod = time.Duration(100) * time.Millisecond
}
// getRandomCoordinate generates a random coordinate.
func getRandomCoordinate() *coordinate.Coordinate {
config := coordinate.DefaultConfig()
@ -43,9 +39,16 @@ func coordinatesEqual(a, b *coordinate.Coordinate) bool {
}
func TestCoordinateUpdate(t *testing.T) {
dir1, s1 := testServer(t)
name := fmt.Sprintf("Node %d", getPort())
dir1, config1 := testServerConfig(t, name)
config1.CoordinateUpdatePeriod = 1000 * time.Millisecond
s1, err := NewServer(config1)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
@ -65,8 +68,6 @@ func TestCoordinateUpdate(t *testing.T) {
Coord: getRandomCoordinate(),
}
updateLastSent = time.Now()
var out struct{}
if err := client.Call("Coordinate.Update", &arg1, &out); err != nil {
t.Fatalf("err: %v", err)
@ -83,10 +84,12 @@ func TestCoordinateUpdate(t *testing.T) {
}
// Wait a while and send another update; this time the updates should be sent
time.Sleep(time.Duration(2) * updatePeriod)
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
if err := client.Call("Coordinate.Update", &arg2, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Yield the current goroutine to allow the goroutine that sends the updates to run
time.Sleep(100 * time.Millisecond)
_, d, err = state.CoordinateGet("node1")
if err != nil {
@ -112,8 +115,6 @@ func TestCoordinateUpdate(t *testing.T) {
}
func TestCoordinateGetLAN(t *testing.T) {
updatePeriod = time.Duration(0) // to make updates instant
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@ -133,9 +134,11 @@ func TestCoordinateGetLAN(t *testing.T) {
if err := client.Call("Coordinate.Update", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Yield the current goroutine to allow the goroutine that sends the updates to run
time.Sleep(100 * time.Millisecond)
// Get via RPC
var out2 *structs.IndexedCoordinate
out2 := structs.IndexedCoordinate{}
arg2 := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: "node1",
@ -153,6 +156,9 @@ func TestCoordinateGetLAN(t *testing.T) {
if err := client.Call("Coordinate.Update", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Yield the current goroutine to allow the goroutine that sends the updates to run
time.Sleep(100 * time.Millisecond)
if err := client.Call("Coordinate.GetLAN", &arg2, &out2); err != nil {
t.Fatalf("err: %v", err)
}

5
consul/server.go

@ -407,7 +407,10 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.endpoints.Internal = &Internal{s}
s.endpoints.ACL = &ACL{s}
if s.config.EnableCoordinates {
s.endpoints.Coordinate = &Coordinate{s}
s.endpoints.Coordinate = &Coordinate{
srv: s,
updateLastSent: time.Now(),
}
}
// Register the handlers

3
consul/server_test.go

@ -66,6 +66,9 @@ func testServerConfig(t *testing.T, NodeName string) (string, *Config) {
config.RaftConfig.ElectionTimeout = 40 * time.Millisecond
config.ReconcileInterval = 100 * time.Millisecond
config.EnableCoordinates = true
config.CoordinateUpdatePeriod = 0 // make updates instant
return dir, config
}

Loading…
Cancel
Save