diff --git a/agent/consul/leader.go b/agent/consul/leader.go index a1c1ce50c9..431aacc779 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -183,6 +183,8 @@ func (s *Server) establishLeadership() error { s.startAutopilot() + s.setConsistentReadReady() + return nil } @@ -199,6 +201,8 @@ func (s *Server) revokeLeadership() error { return err } + s.resetConsistentReadReady() + s.stopAutopilot() return nil diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 9a2d218d64..97a1229c67 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -492,7 +492,13 @@ func TestLeader_LeftLeader(t *testing.T) { if leader == nil { t.Fatalf("Should have a leader") } + if !leader.isReadyForConsistentReads() { + t.Fatalf("Expected leader to be ready for consistent reads ") + } leader.Leave() + if leader.isReadyForConsistentReads() { + t.Fatalf("Expected consistent read state to be false ") + } leader.Shutdown() time.Sleep(100 * time.Millisecond) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 5ea9c8e496..3f8c976568 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -434,5 +434,30 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) { func (s *Server) consistentRead() error { defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now()) future := s.raft.VerifyLeader() - return future.Error() + if err := future.Error(); err != nil { + return err //fail fast if leader verification fails + } + // poll consistent read readiness, wait for up to RPCHoldTimeout milliseconds + if s.isReadyForConsistentReads() { + return nil + } + jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction) + deadline := time.Now().Add(s.config.RPCHoldTimeout) + + for time.Now().Before(deadline) { + + select { + case <-time.After(jitter): + // Drop through and check before we loop again. + + case <-s.shutdownCh: + return fmt.Errorf("shutdown waiting for leader") + } + + if s.isReadyForConsistentReads() { + return nil + } + } + + return structs.ErrNotReadyForConsistentReads } diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 6e08d67e19..9599679873 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -163,3 +163,41 @@ func TestRPC_blockingQuery(t *testing.T) { } } } + +func TestReadyForConsistentReads(t *testing.T) { + dir, s := testServerWithConfig(t, func(c *Config) { + c.RPCHoldTimeout = 2 * time.Millisecond + }) + defer os.RemoveAll(dir) + defer s.Shutdown() + + testrpc.WaitForLeader(t, s.RPC, "dc1") + + if !s.isReadyForConsistentReads() { + t.Fatal("Server should be ready for consistent reads") + } + + s.resetConsistentReadReady() + + setConsistentFunc := func() { + time.Sleep(3 * time.Millisecond) + s.setConsistentReadReady() + } + + go setConsistentFunc() + + //set some time to wait for the goroutine above to finish + waitUntil := time.Now().Add(time.Millisecond * 5) + err := s.consistentRead() + if err.Error() != "Not ready to serve consistent reads" { + t.Fatal("Server should NOT be ready for consistent reads") + } + for time.Now().Before(waitUntil) && err != nil { + err = s.consistentRead() + } + + if err != nil { + t.Fatalf("Expected server to be ready for consistent reads, got error %v", err) + } + +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 4a4661a645..9d1c9741f1 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -14,6 +14,7 @@ import ( "reflect" "strconv" "sync" + "sync/atomic" "time" "github.com/hashicorp/consul/acl" @@ -141,6 +142,9 @@ type Server struct { // updated reconcileCh chan serf.Member + // used to track when the server is ready to serve consistent reads, updated atomically + readyForConsistentReads int32 + // router is used to map out Consul servers in the WAN and in Consul // Enterprise user-defined areas. router *servers.Router @@ -1002,6 +1006,21 @@ func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) { return s.serfWAN.GetCoordinate() } +// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write +func (s *Server) setConsistentReadReady() { + atomic.StoreInt32(&s.readyForConsistentReads, 1) +} + +// Atomically reset readiness state flag on leadership revoke +func (s *Server) resetConsistentReadReady() { + atomic.StoreInt32(&s.readyForConsistentReads, 0) +} + +// Returns true if this server is ready to serve consistent reads +func (s *Server) isReadyForConsistentReads() bool { + return atomic.LoadInt32(&s.readyForConsistentReads) == 1 +} + // peersInfoContent is used to help operators understand what happened to the // peers.json file. This is written to a file called peers.info in the same // location. diff --git a/agent/consul/structs/structs.go b/agent/consul/structs/structs.go index 12d88dda8d..3897c3c1b2 100644 --- a/agent/consul/structs/structs.go +++ b/agent/consul/structs/structs.go @@ -17,9 +17,10 @@ import ( ) var ( - ErrNoLeader = fmt.Errorf("No cluster leader") - ErrNoDCPath = fmt.Errorf("No path to datacenter") - ErrNoServers = fmt.Errorf("No known Consul servers") + ErrNoLeader = fmt.Errorf("No cluster leader") + ErrNoDCPath = fmt.Errorf("No path to datacenter") + ErrNoServers = fmt.Errorf("No known Consul servers") + ErrNotReadyForConsistentReads = fmt.Errorf("Not ready to serve consistent reads") ) type MessageType uint8 diff --git a/website/source/docs/internals/consensus.html.md b/website/source/docs/internals/consensus.html.md index c9c8cbcb86..ac776c69c2 100644 --- a/website/source/docs/internals/consensus.html.md +++ b/website/source/docs/internals/consensus.html.md @@ -75,7 +75,9 @@ is an opaque binary blob). The leader then writes the entry to durable storage a attempts to replicate to a quorum of followers. Once the log entry is considered *committed*, it can be *applied* to a finite state machine. The finite state machine is application specific; in Consul's case, we use -[BoltDB](https://github.com/boltdb/bolt) to maintain cluster state. +[MemDB](https://github.com/hashicorp/go-memdb) to maintain cluster state. Consul's writes +block until it is both _committed_ and _applied_. This achieves read after write semantics +when used with the [consistent](/api/index.html#consistent) mode for queries. Obviously, it would be undesirable to allow a replicated log to grow in an unbounded fashion. Raft provides a mechanism by which the current state is snapshotted and the