diff --git a/consul/coordinate_endpoint.go b/consul/coordinate_endpoint.go index 6430b06251..b2549fbd6c 100644 --- a/consul/coordinate_endpoint.go +++ b/consul/coordinate_endpoint.go @@ -5,7 +5,6 @@ import ( "time" "github.com/hashicorp/consul/consul/structs" - "github.com/hashicorp/serf/coordinate" ) type Coordinate struct { @@ -41,16 +40,22 @@ func (c *Coordinate) GetLAN(args *structs.NodeSpecificRequest, reply *structs.In // // Note that the server does not necessarily know about *all* servers in the given datacenter. // It just returns the coordinates of those that it knows. -func (c *Coordinate) GetWAN(args *structs.DCSpecificRequest, reply *[]*coordinate.Coordinate) error { +func (c *Coordinate) GetWAN(args *structs.DCSpecificRequest, reply *structs.CoordinateList) error { if args.Datacenter == c.srv.config.Datacenter { - *reply = make([]*coordinate.Coordinate, 1) - (*reply)[0] = c.srv.GetWANCoordinate() + reply.Coords = make([]structs.Coordinate, 1) + reply.Coords[0] = structs.Coordinate{ + Node: c.srv.config.NodeName, + Coord: c.srv.GetWANCoordinate(), + } } else { servers := c.srv.remoteConsuls[args.Datacenter] // servers in the specified DC - *reply = make([]*coordinate.Coordinate, 0) + reply.Coords = make([]structs.Coordinate, 0) for i := 0; i < len(servers); i++ { if coord := c.srv.serfWAN.GetCachedCoordinate(servers[i].Name); coord != nil { - *reply = append(*reply, coord) + reply.Coords = append(reply.Coords, structs.Coordinate{ + Node: servers[i].Name, + Coord: coord, + }) } } } @@ -58,6 +63,13 @@ func (c *Coordinate) GetWAN(args *structs.DCSpecificRequest, reply *[]*coordinat return nil } +func flushCoordinates(c *Coordinate, buf []*structs.CoordinateUpdateRequest) { + _, err := c.srv.raftApply(structs.CoordinateRequestType, buf) + if err != nil { + c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err) + } +} + // Update updates the the LAN coordinate of a node. func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error { if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done { @@ -65,24 +77,18 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct } c.updateBufferLock.Lock() + defer c.updateBufferLock.Unlock() c.updateBuffer = append(c.updateBuffer, args) + if time.Since(c.updateLastSent) > c.srv.config.CoordinateUpdatePeriod || len(c.updateBuffer) > c.srv.config.CoordinateUpdateMaxBatchSize { c.srv.logger.Printf("sending update for %v", args.Node) // Apply the potentially time-consuming transaction out of band - go func() { - defer c.updateBufferLock.Unlock() - _, err := c.srv.raftApply(structs.CoordinateRequestType, c.updateBuffer) - // We clear the buffer regardless of whether the raft transaction succeeded, just so the - // buffer doesn't keep growing without bound. - c.updateBuffer = nil - c.updateLastSent = time.Now() + go flushCoordinates(c, c.updateBuffer) - if err != nil { - c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err) - } - }() - } else { - c.updateBufferLock.Unlock() + // We clear the buffer regardless of whether the raft transaction succeeded, just so the + // buffer doesn't keep growing without bound. + c.updateLastSent = time.Now() + c.updateBuffer = nil } return nil diff --git a/consul/coordinate_endpoint_test.go b/consul/coordinate_endpoint_test.go index 9f6affc245..8c7c7d0b1d 100644 --- a/consul/coordinate_endpoint_test.go +++ b/consul/coordinate_endpoint_test.go @@ -205,14 +205,14 @@ func TestCoordinate_GetWAN(t *testing.T) { // Wait for coordinates to be exchanged time.Sleep(s1.config.SerfWANConfig.MemberlistConfig.ProbeInterval * 2) - var coords []*coordinate.Coordinate + var coords structs.CoordinateList arg := structs.DCSpecificRequest{ Datacenter: "dc1", } if err := client.Call("Coordinate.GetWAN", &arg, &coords); err != nil { t.Fatalf("err: %v", err) } - if len(coords) != 1 { + if len(coords.Coords) != 1 { t.Fatalf("there is 1 server in dc1") } @@ -222,7 +222,7 @@ func TestCoordinate_GetWAN(t *testing.T) { if err := client.Call("Coordinate.GetWAN", &arg, &coords); err != nil { t.Fatalf("err: %v", err) } - if len(coords) != 2 { + if len(coords.Coords) != 2 { t.Fatalf("there are 2 servers in dc2") } } diff --git a/consul/server.go b/consul/server.go index ef82c70c06..cdb51d389e 100644 --- a/consul/server.go +++ b/consul/server.go @@ -406,11 +406,9 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { s.endpoints.Session = &Session{s} s.endpoints.Internal = &Internal{s} s.endpoints.ACL = &ACL{s} - if s.config.EnableCoordinates { - s.endpoints.Coordinate = &Coordinate{ - srv: s, - updateLastSent: time.Now(), - } + s.endpoints.Coordinate = &Coordinate{ + srv: s, + updateLastSent: time.Now(), } // Register the handlers @@ -421,9 +419,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { s.rpcServer.Register(s.endpoints.Session) s.rpcServer.Register(s.endpoints.Internal) s.rpcServer.Register(s.endpoints.ACL) - if s.config.EnableCoordinates { - s.rpcServer.Register(s.endpoints.Coordinate) - } + s.rpcServer.Register(s.endpoints.Coordinate) list, err := net.ListenTCP("tcp", s.config.RPCAddr) if err != nil { diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 2386ed3250..232ac54439 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -626,6 +626,10 @@ type Coordinate struct { Coord *coordinate.Coordinate } +type CoordinateList struct { + Coords []Coordinate +} + type IndexedCoordinate struct { Coord *coordinate.Coordinate QueryMeta