Code review feedback, fixed major logic bug

pull/3154/head
Preetha Appan 2017-06-16 10:49:54 -05:00
parent 72af7b9bc4
commit 44f5086873
4 changed files with 13 additions and 18 deletions

View File

@ -183,7 +183,6 @@ func (s *Server) establishLeadership() error {
s.startAutopilot()
// Set consistent read readiness state
s.setConsistentReadReady()
return nil
@ -202,7 +201,6 @@ func (s *Server) revokeLeadership() error {
return err
}
// Clear state about readiness to serve consistent reads
s.resetConsistentReadReady()
s.stopAutopilot()

View File

@ -32,10 +32,6 @@ func TestLeader_RegisterMember(t *testing.T) {
testrpc.WaitForLeader(t, s1.RPC, "dc1")
if !s1.isReadyForConsistentReads() {
t.Fatalf("Expected server to be ready for consistent reads ")
}
// Client should be registered
state := s1.fsm.State()
retry.Run(t, func(r *retry.R) {
@ -496,9 +492,12 @@ func TestLeader_LeftLeader(t *testing.T) {
if leader == nil {
t.Fatalf("Should have a leader")
}
if !leader.isReadyForConsistentReads() {
t.Fatalf("Expected leader to be ready for consistent reads ")
}
leader.Leave()
if leader.isReadyForConsistentReads() {
t.Fatalf("Expectected consistent read state to be false ")
t.Fatalf("Expected consistent read state to be false ")
}
leader.Shutdown()
time.Sleep(100 * time.Millisecond)

View File

@ -434,8 +434,7 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) {
func (s *Server) consistentRead() error {
defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now())
future := s.raft.VerifyLeader()
err := future.Error()
if err != nil {
if err := future.Error(); err != nil {
return err //fail fast if leader verification fails
}
// poll consistent read readiness, wait for up to RPCHoldTimeout milliseconds
@ -443,9 +442,9 @@ func (s *Server) consistentRead() error {
return nil
}
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction)
deadline := time.Now().Add(jitter)
deadline := time.Now().Add(s.config.RPCHoldTimeout)
for time.Now().After(deadline) {
for time.Now().Before(deadline) {
select {
case <-time.After(jitter):

View File

@ -14,9 +14,8 @@ import (
"reflect"
"strconv"
"sync"
"time"
"sync/atomic"
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/agent"
@ -143,8 +142,8 @@ type Server struct {
// updated
reconcileCh chan serf.Member
// used to track when the server is ready to serve consistent reads
readyForConsistentReads uint32
// used to track when the server is ready to serve consistent reads, updated atomically
readyForConsistentReads int32
// router is used to map out Consul servers in the WAN and in Consul
// Enterprise user-defined areas.
@ -1009,17 +1008,17 @@ func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) {
// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func (s *Server) setConsistentReadReady() {
atomic.StoreUint32(&s.readyForConsistentReads, 1)
atomic.StoreInt32(&s.readyForConsistentReads, 1)
}
// Atomically reset readiness state flag on leadership revoke
func (s *Server) resetConsistentReadReady() {
atomic.StoreUint32(&s.readyForConsistentReads, 0)
atomic.StoreInt32(&s.readyForConsistentReads, 0)
}
// Returns true if this server is ready to serve consistent reads
func (s *Server) isReadyForConsistentReads() bool {
return atomic.LoadUint32(&s.readyForConsistentReads) > 0
return atomic.LoadInt32(&s.readyForConsistentReads) > 0
}
// peersInfoContent is used to help operators understand what happened to the