Browse Source

Address comments

pull/1331/head
Derek Chiang 10 years ago committed by James Phillips
parent
commit
7255ddd086
  1. 44
      consul/coordinate_endpoint.go
  2. 6
      consul/coordinate_endpoint_test.go
  3. 12
      consul/server.go
  4. 4
      consul/structs/structs.go

44
consul/coordinate_endpoint.go

@ -5,7 +5,6 @@ import (
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/coordinate"
)
type Coordinate struct {
@ -41,16 +40,22 @@ func (c *Coordinate) GetLAN(args *structs.NodeSpecificRequest, reply *structs.In
//
// Note that the server does not necessarily know about *all* servers in the given datacenter.
// It just returns the coordinates of those that it knows.
func (c *Coordinate) GetWAN(args *structs.DCSpecificRequest, reply *[]*coordinate.Coordinate) error {
func (c *Coordinate) GetWAN(args *structs.DCSpecificRequest, reply *structs.CoordinateList) error {
if args.Datacenter == c.srv.config.Datacenter {
*reply = make([]*coordinate.Coordinate, 1)
(*reply)[0] = c.srv.GetWANCoordinate()
reply.Coords = make([]structs.Coordinate, 1)
reply.Coords[0] = structs.Coordinate{
Node: c.srv.config.NodeName,
Coord: c.srv.GetWANCoordinate(),
}
} else {
servers := c.srv.remoteConsuls[args.Datacenter] // servers in the specified DC
*reply = make([]*coordinate.Coordinate, 0)
reply.Coords = make([]structs.Coordinate, 0)
for i := 0; i < len(servers); i++ {
if coord := c.srv.serfWAN.GetCachedCoordinate(servers[i].Name); coord != nil {
*reply = append(*reply, coord)
reply.Coords = append(reply.Coords, structs.Coordinate{
Node: servers[i].Name,
Coord: coord,
})
}
}
}
@ -58,6 +63,13 @@ func (c *Coordinate) GetWAN(args *structs.DCSpecificRequest, reply *[]*coordinat
return nil
}
func flushCoordinates(c *Coordinate, buf []*structs.CoordinateUpdateRequest) {
_, err := c.srv.raftApply(structs.CoordinateRequestType, buf)
if err != nil {
c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err)
}
}
// Update updates the the LAN coordinate of a node.
func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error {
if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done {
@ -65,24 +77,18 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
}
c.updateBufferLock.Lock()
defer c.updateBufferLock.Unlock()
c.updateBuffer = append(c.updateBuffer, args)
if time.Since(c.updateLastSent) > c.srv.config.CoordinateUpdatePeriod || len(c.updateBuffer) > c.srv.config.CoordinateUpdateMaxBatchSize {
c.srv.logger.Printf("sending update for %v", args.Node)
// Apply the potentially time-consuming transaction out of band
go func() {
defer c.updateBufferLock.Unlock()
_, err := c.srv.raftApply(structs.CoordinateRequestType, c.updateBuffer)
// We clear the buffer regardless of whether the raft transaction succeeded, just so the
// buffer doesn't keep growing without bound.
c.updateBuffer = nil
c.updateLastSent = time.Now()
go flushCoordinates(c, c.updateBuffer)
if err != nil {
c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err)
}
}()
} else {
c.updateBufferLock.Unlock()
// We clear the buffer regardless of whether the raft transaction succeeded, just so the
// buffer doesn't keep growing without bound.
c.updateLastSent = time.Now()
c.updateBuffer = nil
}
return nil

6
consul/coordinate_endpoint_test.go

@ -205,14 +205,14 @@ func TestCoordinate_GetWAN(t *testing.T) {
// Wait for coordinates to be exchanged
time.Sleep(s1.config.SerfWANConfig.MemberlistConfig.ProbeInterval * 2)
var coords []*coordinate.Coordinate
var coords structs.CoordinateList
arg := structs.DCSpecificRequest{
Datacenter: "dc1",
}
if err := client.Call("Coordinate.GetWAN", &arg, &coords); err != nil {
t.Fatalf("err: %v", err)
}
if len(coords) != 1 {
if len(coords.Coords) != 1 {
t.Fatalf("there is 1 server in dc1")
}
@ -222,7 +222,7 @@ func TestCoordinate_GetWAN(t *testing.T) {
if err := client.Call("Coordinate.GetWAN", &arg, &coords); err != nil {
t.Fatalf("err: %v", err)
}
if len(coords) != 2 {
if len(coords.Coords) != 2 {
t.Fatalf("there are 2 servers in dc2")
}
}

12
consul/server.go

@ -406,11 +406,9 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.endpoints.Session = &Session{s}
s.endpoints.Internal = &Internal{s}
s.endpoints.ACL = &ACL{s}
if s.config.EnableCoordinates {
s.endpoints.Coordinate = &Coordinate{
srv: s,
updateLastSent: time.Now(),
}
s.endpoints.Coordinate = &Coordinate{
srv: s,
updateLastSent: time.Now(),
}
// Register the handlers
@ -421,9 +419,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.rpcServer.Register(s.endpoints.Session)
s.rpcServer.Register(s.endpoints.Internal)
s.rpcServer.Register(s.endpoints.ACL)
if s.config.EnableCoordinates {
s.rpcServer.Register(s.endpoints.Coordinate)
}
s.rpcServer.Register(s.endpoints.Coordinate)
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
if err != nil {

4
consul/structs/structs.go

@ -626,6 +626,10 @@ type Coordinate struct {
Coord *coordinate.Coordinate
}
type CoordinateList struct {
Coords []Coordinate
}
type IndexedCoordinate struct {
Coord *coordinate.Coordinate
QueryMeta

Loading…
Cancel
Save