From 51e3ca6cbbd5f5aae1af0527bde637d520dc46ec Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Mon, 25 Jan 2021 13:24:32 -0600 Subject: [PATCH] server: use the presense of stored federation state data as a sign that we already activated the federation state feature flag (#9519) This way we only have to wait for the serf barrier to pass once before we can make use of federation state APIs Without this patch every restart needs to re-compute the change. --- .changelog/9519.txt | 3 + agent/consul/helper_test.go | 10 +++ agent/consul/leader.go | 6 +- agent/consul/leader_federation_state_ae.go | 10 +++ .../consul/leader_federation_state_ae_test.go | 77 +++++++++++++++++++ 5 files changed, 105 insertions(+), 1 deletion(-) create mode 100644 .changelog/9519.txt diff --git a/.changelog/9519.txt b/.changelog/9519.txt new file mode 100644 index 0000000000..204d6b806f --- /dev/null +++ b/.changelog/9519.txt @@ -0,0 +1,3 @@ +```release-note:improvement +server: use the presense of stored federation state data as a sign that we already activated the federation state feature flag +``` diff --git a/agent/consul/helper_test.go b/agent/consul/helper_test.go index b4137a1131..1a0919527c 100644 --- a/agent/consul/helper_test.go +++ b/agent/consul/helper_test.go @@ -186,6 +186,16 @@ func waitForNewACLReplication(t *testing.T, server *Server, expectedReplicationT }) } +func waitForFederationStateFeature(t *testing.T, server *Server) { + t.Helper() + + retry.Run(t, func(r *retry.R) { + require.True(r, server.DatacenterSupportsFederationStates()) + }) + + require.True(t, server.DatacenterSupportsFederationStates()) +} + func seeEachOther(a, b []serf.Member, addra, addrb string) bool { return serfMembersContains(a, addrb) && serfMembersContains(b, addra) } diff --git a/agent/consul/leader.go b/agent/consul/leader.go index c1a15d1853..6ecb18d005 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -1484,6 +1484,10 @@ func (s *Server) reapTombstones(index uint64) { } } +func (s *Server) setDatacenterSupportsFederationStates() { + atomic.StoreInt32(&s.dcSupportsFederationStates, 1) +} + func (s *Server) DatacenterSupportsFederationStates() bool { if atomic.LoadInt32(&s.dcSupportsFederationStates) != 0 { return true @@ -1508,7 +1512,7 @@ func (s *Server) DatacenterSupportsFederationStates() bool { s.router.CheckServers(s.config.Datacenter, state.update) if state.supported && state.found { - atomic.StoreInt32(&s.dcSupportsFederationStates, 1) + s.setDatacenterSupportsFederationStates() return true } diff --git a/agent/consul/leader_federation_state_ae.go b/agent/consul/leader_federation_state_ae.go index 4e5a8a45b2..5adf08f347 100644 --- a/agent/consul/leader_federation_state_ae.go +++ b/agent/consul/leader_federation_state_ae.go @@ -17,6 +17,16 @@ const ( ) func (s *Server) startFederationStateAntiEntropy() { + // Check to see if we can skip waiting for serf feature detection below. + if !s.DatacenterSupportsFederationStates() { + _, fedStates, err := s.fsm.State().FederationStateList(nil) + if err != nil { + s.logger.Warn("Failed to check for existing federation states and activate the feature flag quicker; skipping this optimization", "error", err) + } else if len(fedStates) > 0 { + s.setDatacenterSupportsFederationStates() + } + } + if s.config.DisableFederationStateAntiEntropy { return } diff --git a/agent/consul/leader_federation_state_ae_test.go b/agent/consul/leader_federation_state_ae_test.go index 2c06592573..61a40891ca 100644 --- a/agent/consul/leader_federation_state_ae_test.go +++ b/agent/consul/leader_federation_state_ae_test.go @@ -13,6 +13,83 @@ import ( "github.com/stretchr/testify/require" ) +func TestLeader_FederationStateAntiEntropy_FeatureIsStickyEvenIfSerfTagsRegress(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + // We test this by having two datacenters with one server each. They + // initially come up and pass the serf barrier, then we power them both + // off. We leave the primary off permanently, and then we stand up the + // secondary. Hopefully it should transition to allow federation states. + + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + waitForLeaderEstablishment(t, s1) + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + c.FederationStateReplicationRate = 100 + c.FederationStateReplicationBurst = 100 + c.FederationStateReplicationApplyLimit = 1000000 + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + codec2 := rpcClient(t, s2) + defer codec2.Close() + + waitForLeaderEstablishment(t, s2) + + // Create the WAN link + joinWAN(t, s2, s1) + waitForLeaderEstablishment(t, s1) + waitForLeaderEstablishment(t, s2) + + waitForFederationStateFeature(t, s1) + waitForFederationStateFeature(t, s2) + + // Wait for everybody's AE to complete. + retry.Run(t, func(r *retry.R) { + _, states, err := s1.fsm.State().FederationStateList(nil) + require.NoError(r, err) + require.Len(r, states, 2) + }) + + // Shutdown s1 and s2. + s1.Shutdown() + s2.Shutdown() + + // Restart just s2 + + dir2new, s2new := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + c.FederationStateReplicationRate = 100 + c.FederationStateReplicationBurst = 100 + c.FederationStateReplicationApplyLimit = 1000000 + + c.DataDir = s2.config.DataDir + c.NodeName = s2.config.NodeName + c.NodeID = s2.config.NodeID + }) + defer os.RemoveAll(dir2new) + defer s2new.Shutdown() + + waitForLeaderEstablishment(t, s2new) + + // It should be able to transition without connectivity to the primary. + waitForFederationStateFeature(t, s2new) +} + func TestLeader_FederationStateAntiEntropy_BlockingQuery(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short")