diff --git a/agent/consul/leader.go b/agent/consul/leader.go index a08ebef267..431aacc779 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -183,7 +183,6 @@ func (s *Server) establishLeadership() error { s.startAutopilot() - // Set consistent read readiness state s.setConsistentReadReady() return nil @@ -202,7 +201,6 @@ func (s *Server) revokeLeadership() error { return err } - // Clear state about readiness to serve consistent reads s.resetConsistentReadReady() s.stopAutopilot() diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 99718359bb..97a1229c67 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -32,10 +32,6 @@ func TestLeader_RegisterMember(t *testing.T) { testrpc.WaitForLeader(t, s1.RPC, "dc1") - if !s1.isReadyForConsistentReads() { - t.Fatalf("Expected server to be ready for consistent reads ") - } - // Client should be registered state := s1.fsm.State() retry.Run(t, func(r *retry.R) { @@ -496,9 +492,12 @@ func TestLeader_LeftLeader(t *testing.T) { if leader == nil { t.Fatalf("Should have a leader") } + if !leader.isReadyForConsistentReads() { + t.Fatalf("Expected leader to be ready for consistent reads ") + } leader.Leave() if leader.isReadyForConsistentReads() { - t.Fatalf("Expectected consistent read state to be false ") + t.Fatalf("Expected consistent read state to be false ") } leader.Shutdown() time.Sleep(100 * time.Millisecond) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 36edd17d33..3f8c976568 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -434,8 +434,7 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) { func (s *Server) consistentRead() error { defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now()) future := s.raft.VerifyLeader() - err := future.Error() - if err != nil { + if err := future.Error(); err != nil { return err //fail fast if leader verification fails } // poll consistent read readiness, wait for up to RPCHoldTimeout milliseconds @@ -443,9 +442,9 @@ func (s *Server) consistentRead() error { return nil } jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction) - deadline := time.Now().Add(jitter) + deadline := time.Now().Add(s.config.RPCHoldTimeout) - for time.Now().After(deadline) { + for time.Now().Before(deadline) { select { case <-time.After(jitter): diff --git a/agent/consul/server.go b/agent/consul/server.go index 4bc1ea84d8..902548c063 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -14,9 +14,8 @@ import ( "reflect" "strconv" "sync" - "time" - "sync/atomic" + "time" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/agent" @@ -143,8 +142,8 @@ type Server struct { // updated reconcileCh chan serf.Member - // used to track when the server is ready to serve consistent reads - readyForConsistentReads uint32 + // used to track when the server is ready to serve consistent reads, updated atomically + readyForConsistentReads int32 // router is used to map out Consul servers in the WAN and in Consul // Enterprise user-defined areas. @@ -1009,17 +1008,17 @@ func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) { // Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write func (s *Server) setConsistentReadReady() { - atomic.StoreUint32(&s.readyForConsistentReads, 1) + atomic.StoreInt32(&s.readyForConsistentReads, 1) } // Atomically reset readiness state flag on leadership revoke func (s *Server) resetConsistentReadReady() { - atomic.StoreUint32(&s.readyForConsistentReads, 0) + atomic.StoreInt32(&s.readyForConsistentReads, 0) } // Returns true if this server is ready to serve consistent reads func (s *Server) isReadyForConsistentReads() bool { - return atomic.LoadUint32(&s.readyForConsistentReads) > 0 + return atomic.LoadInt32(&s.readyForConsistentReads) > 0 } // peersInfoContent is used to help operators understand what happened to the