From f144d17b1cd7ca9d776a574333694312ab4d8e27 Mon Sep 17 00:00:00 2001 From: Derek Chiang Date: Tue, 28 Apr 2015 21:47:41 -0400 Subject: [PATCH] Address comments --- command/agent/agent.go | 15 +++++++-------- command/agent/command.go | 3 --- command/agent/local_test.go | 14 ++++++++++++-- consul/coordinate_endpoint.go | 7 ++----- consul/coordinate_endpoint_test.go | 26 ++++++++++++++++++++++++-- 5 files changed, 45 insertions(+), 20 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index b7a84521f3..9d49d14748 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -38,9 +38,6 @@ const ( "but no reason was provided. This is a default message." defaultServiceMaintReason = "Maintenance mode is enabled for this " + "service, but no reason was provided. This is a default message." - - // An interval used to send network coordinates to servers - syncCoordinateStaggerIntv = 15 * time.Second ) var ( @@ -201,6 +198,9 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { return nil, err } + // Start sending network coordinates to servers + go agent.sendCoordinates() + return agent, nil } @@ -560,15 +560,14 @@ func (a *Agent) ResumeSync() { a.state.Resume() } -// SendCoordinates starts a loop that periodically sends the local coordinate +// sendCoordinates starts a loop that periodically sends the local coordinate // to a server -func (a *Agent) SendCoordinates() { +func (a *Agent) sendCoordinates() { for { intv := aeScale(a.config.SyncCoordinateInterval, len(a.LANMembers())) intv = intv + randomStagger(intv) - timer := time.After(intv) select { - case <-timer: + case <-time.After(intv): var c *coordinate.Coordinate if a.config.Server { c = a.server.GetLANCoordinate() @@ -585,7 +584,7 @@ func (a *Agent) SendCoordinates() { var reply struct{} if err := a.RPC("Coordinate.Update", &req, &reply); err != nil { - a.logger.Printf("[ERR] coordinate update error: %s", err.Error()) + a.logger.Printf("[ERR] agent: coordinate update error: %s", err.Error()) } case <-a.shutdownCh: return diff --git a/command/agent/command.go b/command/agent/command.go index 89d2ac7d97..2d642b20e3 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -709,9 +709,6 @@ func (c *Command) Run(args []string) int { errWanCh := make(chan struct{}) go c.retryJoinWan(config, errWanCh) - // Start sending network coordinates to servers - go c.agent.SendCoordinates() - // Wait for exit return c.handleSignals(config, errCh, errWanCh) } diff --git a/command/agent/local_test.go b/command/agent/local_test.go index c307e0a75b..3d78038e4a 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -822,8 +822,6 @@ func TestAgentSendCoordinates(t *testing.T) { testutil.WaitForLeader(t, agent1.RPC, "dc1") - go agent1.SendCoordinates() - go agent2.SendCoordinates() time.Sleep(100 * time.Millisecond) var reply structs.IndexedCoordinate @@ -837,4 +835,16 @@ func TestAgentSendCoordinates(t *testing.T) { if reply.Coord == nil { t.Fatalf("should get a coordinate") } + + var reply2 structs.IndexedCoordinate + req2 := structs.CoordinateGetRequest{ + Datacenter: agent2.config.Datacenter, + Node: agent2.config.NodeName, + } + if err := agent1.RPC("Coordinate.Get", &req2, &reply2); err != nil { + t.Fatalf("err: %v", err) + } + if reply2.Coord == nil { + t.Fatalf("should get a coordinate") + } } diff --git a/consul/coordinate_endpoint.go b/consul/coordinate_endpoint.go index dda85e9cd7..64b56bb234 100644 --- a/consul/coordinate_endpoint.go +++ b/consul/coordinate_endpoint.go @@ -8,11 +8,7 @@ type Coordinate struct { srv *Server } -// Get returns the the coordinate or a node. -// -// If the node is in the same datacenter, then the LAN coordinate of the node is -// returned. If the node is in a remote DC, then the WAN coordinate of the node -// is returned. +// Get returns the the LAN coordinate of a node. func (c *Coordinate) Get(args *structs.CoordinateGetRequest, reply *structs.IndexedCoordinate) error { if done, err := c.srv.forward("Coordinate.Get", args, args, reply); done { return err @@ -30,6 +26,7 @@ func (c *Coordinate) Get(args *structs.CoordinateGetRequest, reply *structs.Inde }) } +// Update updates the the LAN coordinate of a node. func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error { if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done { return err diff --git a/consul/coordinate_endpoint_test.go b/consul/coordinate_endpoint_test.go index b08bab7999..a5da3d55d8 100644 --- a/consul/coordinate_endpoint_test.go +++ b/consul/coordinate_endpoint_test.go @@ -18,7 +18,7 @@ func getRandomCoordinate() *coordinate.Coordinate { n := 5 clients := make([]*coordinate.Client, n) for i := 0; i < n; i++ { - clients[i] = coordinate.NewClient(config) + clients[i], _ = coordinate.NewClient(config) } for i := 0; i < n*100; i++ { @@ -41,7 +41,7 @@ func coordinatesEqual(a, b *coordinate.Coordinate) bool { return dist < 0.1 } -func TestCoordinate(t *testing.T) { +func TestCoordinateUpdate(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -71,6 +71,28 @@ func TestCoordinate(t *testing.T) { if !coordinatesEqual(d.Coord, arg.Coord) { t.Fatalf("should be equal\n%v\n%v", d.Coord, arg.Coord) } +} + +func TestCoordinateGet(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + testutil.WaitForLeader(t, client.Call, "dc1") + + arg := structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "node1", + Op: structs.CoordinateSet, + Coord: getRandomCoordinate(), + } + + var out struct{} + if err := client.Call("Coordinate.Update", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } // Get via RPC var out2 *structs.IndexedCoordinate