Browse Source

Makes the version upshift code look at the correct version field.

pull/1353/head
James Phillips 9 years ago
parent
commit
c0bd639662
  1. 30
      command/agent/agent.go
  2. 27
      command/agent/agent_test.go
  3. 24
      consul/util.go
  4. 84
      consul/util_test.go

30
command/agent/agent.go

@ -549,22 +549,6 @@ func (a *Agent) WANMembers() []serf.Member {
}
}
// CanServersUnderstandProtocol checks to see if all the servers understand the
// given protocol version.
func (a *Agent) CanServersUnderstandProtocol(version uint8) bool {
numServers, numWhoGrok := 0, 0
members := a.LANMembers()
for _, member := range members {
if member.Tags["role"] == "consul" {
numServers++
if member.ProtocolMax >= version {
numWhoGrok++
}
}
}
return (numServers > 0) && (numWhoGrok == numServers)
}
// StartSync is called once Services and Checks are registered.
// This is called to prevent a race between clients and the anti-entropy routines
func (a *Agent) StartSync() {
@ -603,13 +587,19 @@ func (a *Agent) sendCoordinate() {
select {
case <-time.After(intv):
if !a.CanServersUnderstandProtocol(3) {
members := a.LANMembers()
grok, err := consul.CanServersUnderstandProtocol(members, 3)
if err != nil {
a.logger.Printf("[ERR] agent: failed to check servers: %s", err)
continue
}
if !grok {
a.logger.Printf("[DEBUG] agent: skipping coordinate updates until servers are upgraded")
continue
}
var c *coordinate.Coordinate
var err error
if c, err = a.GetCoordinate(); err != nil {
c, err := a.GetCoordinate()
if err != nil {
a.logger.Printf("[ERR] agent: failed to get coordinate: %s", err)
continue
}

27
command/agent/agent_test.go

@ -1602,30 +1602,3 @@ func TestAgent_GetCoordinate(t *testing.T) {
check(true)
check(false)
}
func TestAgent_CanServersUnderstandProtocol(t *testing.T) {
config := nextConfig()
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)
defer agent.Shutdown()
min := uint8(consul.ProtocolVersionMin)
if !agent.CanServersUnderstandProtocol(min) {
t.Fatalf("should grok %d", min)
}
max := uint8(consul.ProtocolVersionMax)
if !agent.CanServersUnderstandProtocol(max) {
t.Fatalf("should grok %d", max)
}
current := uint8(config.Protocol)
if !agent.CanServersUnderstandProtocol(current) {
t.Fatalf("should grok %d", current)
}
future := max + 1
if agent.CanServersUnderstandProtocol(future) {
t.Fatalf("should not grok %d", future)
}
}

24
consul/util.go

@ -95,6 +95,30 @@ func ensurePath(path string, dir bool) error {
return os.MkdirAll(path, 0755)
}
// CanServersUnderstandProtocol checks to see if all the servers in the given
// list understand the given protocol version or higher. If there are no servers
// in the list then this will return false.
func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, error) {
numServers, numWhoGrok := 0, 0
for _, m := range members {
if m.Tags["role"] != "consul" {
continue
}
numServers++
vsn_str := m.Tags["vsn_max"]
vsn, err := strconv.Atoi(vsn_str)
if err != nil {
return false, err
}
if vsn >= int(version) {
numWhoGrok++
}
}
return (numServers > 0) && (numWhoGrok == numServers), nil
}
// Returns if a member is a consul server. Returns a bool,
// the datacenter, and the rpc port
func isConsulServer(m serf.Member) (bool, *serverParts) {

84
consul/util_test.go

@ -2,6 +2,7 @@ package consul
import (
"errors"
"fmt"
"net"
"regexp"
"testing"
@ -118,6 +119,89 @@ func TestIsPrivateIP(t *testing.T) {
}
}
func TestUtil_CanServersUnderstandProtocol(t *testing.T) {
var members []serf.Member
// All empty list cases should return false.
for v := ProtocolVersionMin; v <= ProtocolVersionMax; v++ {
grok, err := CanServersUnderstandProtocol(members, v)
if err != nil {
t.Fatalf("err: %v", err)
}
if grok {
t.Fatalf("empty list should always return false")
}
}
// Add a non-server member.
members = append(members, serf.Member{
Tags: map[string]string{
"vsn_max": fmt.Sprintf("%d", ProtocolVersionMax),
},
})
// Make sure it doesn't get counted.
for v := ProtocolVersionMin; v <= ProtocolVersionMax; v++ {
grok, err := CanServersUnderstandProtocol(members, v)
if err != nil {
t.Fatalf("err: %v", err)
}
if grok {
t.Fatalf("non-server members should not be counted")
}
}
// Add a server member.
members = append(members, serf.Member{
Tags: map[string]string{
"role": "consul",
"vsn_max": fmt.Sprintf("%d", ProtocolVersionMax),
},
})
// Now it should report that it understands.
for v := ProtocolVersionMin; v <= ProtocolVersionMax; v++ {
grok, err := CanServersUnderstandProtocol(members, v)
if err != nil {
t.Fatalf("err: %v", err)
}
if !grok {
t.Fatalf("server should grok")
}
}
// Nobody should understand anything from the future.
for v := uint8(ProtocolVersionMax + 1); v <= uint8(ProtocolVersionMax+10); v++ {
grok, err := CanServersUnderstandProtocol(members, v)
if err != nil {
t.Fatalf("err: %v", err)
}
if grok {
t.Fatalf("server should not grok")
}
}
// Add an older server.
members = append(members, serf.Member{
Tags: map[string]string{
"role": "consul",
"vsn_max": fmt.Sprintf("%d", ProtocolVersionMax-1),
},
})
// The servers should no longer understand the max version.
for v := ProtocolVersionMin; v <= ProtocolVersionMax; v++ {
grok, err := CanServersUnderstandProtocol(members, v)
if err != nil {
t.Fatalf("err: %v", err)
}
expected := v < ProtocolVersionMax
if grok != expected {
t.Fatalf("bad: %v != %v", grok, expected)
}
}
}
func TestIsConsulServer(t *testing.T) {
m := serf.Member{
Name: "foo",

Loading…
Cancel
Save