diff --git a/agent/agent.go b/agent/agent.go index 852d742abe..b151108a52 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -370,6 +370,8 @@ type Agent struct { // enterpriseAgent embeds fields that we only access in consul-enterprise builds enterpriseAgent + + coalesceTimerShim func(inputCh chan *config.FileWatcherEvent, coalesceDuration time.Duration) chan []config.FileWatcherEvent } // New process the desired options and creates a new Agent. @@ -470,6 +472,7 @@ func New(bd BaseDeps) (*Agent, error) { a.configFileWatcher = w } + a.coalesceTimerShim = coalesceTimer return &a, nil } @@ -730,33 +733,11 @@ func (a *Agent) Start(ctx context.Context) error { a.baseDeps.Logger.Debug("starting file watcher") a.FileWatcher.Start(context.Background()) go func() { - var coalesceTimer *time.Timer = nil - eventsCount := 0 - sendCh := make(chan struct{}) - for { - select { - case event, ok := <-a.FileWatcher.EventsCh: - if !ok { - return - } - a.baseDeps.Logger.Debug("auto-reload event received", "event-file", event.Filename) - if coalesceTimer == nil { - coalesceTimer = time.AfterFunc(1*time.Millisecond, func() { - // This runs in another goroutine so we can't just do the send - // directly here as access to snap is racy. Instead, signal the main - // loop above. - sendCh <- struct{}{} - }) - } - eventsCount++ - case <-sendCh: - a.baseDeps.Logger.Debug("auto-reload config triggered", "num-events", eventsCount) - coalesceTimer = nil - eventsCount = 0 - err := a.ReloadConfig(true) - if err != nil { - a.baseDeps.Logger.Error("error loading config", "error", err) - } + for events := range a.coalesceTimerShim(a.FileWatcher.EventsCh, 1*time.Millisecond) { + a.baseDeps.Logger.Debug("auto-reload config triggered", "num-events", len(events)) + err := a.ReloadConfig(true) + if err != nil { + a.baseDeps.Logger.Error("error loading config", "error", err) } } } @@ -766,6 +747,38 @@ func (a *Agent) Start(ctx context.Context) error { return nil } +func coalesceTimer(inputCh chan *config.FileWatcherEvent, coalesceDuration time.Duration) chan []config.FileWatcherEvent { + var coalesceTimer *time.Timer = nil + sendCh := make(chan struct{}) + FileWatcherEvents := make([]config.FileWatcherEvent, 0) + FileWatcherEventsCh := make(chan []config.FileWatcherEvent) + go func() { + for { + select { + case event, ok := <-inputCh: + if !ok { + close(FileWatcherEventsCh) + return + } + FileWatcherEvents = append(FileWatcherEvents, *event) + if coalesceTimer == nil { + coalesceTimer = time.AfterFunc(coalesceDuration, func() { + // This runs in another goroutine so we can't just do the send + // directly here as access to snap is racy. Instead, signal the main + // loop above. + sendCh <- struct{}{} + }) + } + case <-sendCh: + coalesceTimer = nil + FileWatcherEventsCh <- FileWatcherEvents + FileWatcherEvents = make([]config.FileWatcherEvent, 0) + } + } + }() + return FileWatcherEventsCh +} + var Gauges = []prometheus.GaugeDefinition{ { Name: []string{"version"}, diff --git a/agent/agent_test.go b/agent/agent_test.go index ba82f127f6..277fc98a1d 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -5650,7 +5650,7 @@ func TestAgent_AutoReloadDoReload_WhenKeyThenCertUpdated(t *testing.T) { `), 0600)) srv := StartTestAgent(t, TestAgent{Name: "TestAgent-Server", HCL: hclConfig, configFiles: []string{configFile}}) - + srv.coalesceTimerShim = testCoalesceTimer defer srv.Shutdown() testrpc.WaitForTestAgent(t, srv.RPC, "dc1", testrpc.WithToken(TestDefaultInitialManagementToken)) @@ -5735,113 +5735,12 @@ func TestAgent_AutoReloadDoReload_WhenKeyThenCertUpdated(t *testing.T) { }) } -func Test_coalesceTimerTwoPeriods(t *testing.T) { - - certsDir := testutil.TempDir(t, "auto-config") - - // write some test TLS certificates out to the cfg dir - serverName := "server.dc1.consul" - signer, _, err := tlsutil.GeneratePrivateKey() - require.NoError(t, err) - - ca, _, err := tlsutil.GenerateCA(tlsutil.CAOpts{Signer: signer}) - require.NoError(t, err) - - cert, privateKey, err := tlsutil.GenerateCert(tlsutil.CertOpts{ - Signer: signer, - CA: ca, - Name: "Test Cert Name", - Days: 365, - DNSNames: []string{serverName}, - ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}, - }) - require.NoError(t, err) - - certFile := filepath.Join(certsDir, "cert.pem") - caFile := filepath.Join(certsDir, "cacert.pem") - keyFile := filepath.Join(certsDir, "key.pem") - - require.NoError(t, ioutil.WriteFile(certFile, []byte(cert), 0600)) - require.NoError(t, ioutil.WriteFile(caFile, []byte(ca), 0600)) - require.NoError(t, ioutil.WriteFile(keyFile, []byte(privateKey), 0600)) - - // generate a gossip key - gossipKey := make([]byte, 32) - n, err := rand.Read(gossipKey) - require.NoError(t, err) - require.Equal(t, 32, n) - gossipKeyEncoded := base64.StdEncoding.EncodeToString(gossipKey) - - hclConfig := TestACLConfigWithParams(nil) - - configFile := testutil.TempDir(t, "config") + "/config.hcl" - require.NoError(t, ioutil.WriteFile(configFile, []byte(` - encrypt = "`+gossipKeyEncoded+`" - encrypt_verify_incoming = true - encrypt_verify_outgoing = true - verify_incoming = true - verify_outgoing = true - verify_server_hostname = true - ca_file = "`+caFile+`" - cert_file = "`+certFile+`" - key_file = "`+keyFile+`" - connect { enabled = true } - auto_reload_config = true - `), 0600)) - - coalesceInterval := 100 * time.Millisecond - testAgent := TestAgent{Name: "TestAgent-Server", HCL: hclConfig, configFiles: []string{configFile}, Config: &config.RuntimeConfig{ - AutoReloadConfigCoalesceInterval: coalesceInterval, - }} - srv := StartTestAgent(t, testAgent) - defer srv.Shutdown() - - testrpc.WaitForTestAgent(t, srv.RPC, "dc1", testrpc.WithToken(TestDefaultInitialManagementToken)) - - cert1Pub := srv.tlsConfigurator.Cert().Certificate - cert1Key := srv.tlsConfigurator.Cert().PrivateKey - - certNew, privateKeyNew, err := tlsutil.GenerateCert(tlsutil.CertOpts{ - Signer: signer, - CA: ca, - Name: "Test Cert Name", - Days: 365, - DNSNames: []string{serverName}, - ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth}, - }) - require.NoError(t, err) - certFileNew := filepath.Join(certsDir, "cert_new.pem") - require.NoError(t, ioutil.WriteFile(certFileNew, []byte(certNew), 0600)) - require.NoError(t, ioutil.WriteFile(configFile, []byte(` - encrypt = "`+gossipKeyEncoded+`" - encrypt_verify_incoming = true - encrypt_verify_outgoing = true - verify_incoming = true - verify_outgoing = true - verify_server_hostname = true - ca_file = "`+caFile+`" - cert_file = "`+certFileNew+`" - key_file = "`+keyFile+`" - connect { enabled = true } - auto_reload_config = true - `), 0600)) - - // cert should not change as we did not update the associated key - time.Sleep(coalesceInterval * 2) - retry.Run(t, func(r *retry.R) { - cert := srv.tlsConfigurator.Cert() - require.NotNil(r, cert) - require.Equal(r, cert1Pub, cert.Certificate) - require.Equal(r, cert1Key, cert.PrivateKey) - }) - - require.NoError(t, ioutil.WriteFile(keyFile, []byte(privateKeyNew), 0600)) - - // cert should change as we did not update the associated key - time.Sleep(coalesceInterval * 2) - retry.Run(t, func(r *retry.R) { - require.NotEqual(r, cert1Pub, srv.tlsConfigurator.Cert().Certificate) - require.NotEqual(r, cert1Key, srv.tlsConfigurator.Cert().PrivateKey) - }) - +func testCoalesceTimer(inputCh chan *config.FileWatcherEvent, _ time.Duration) chan []config.FileWatcherEvent { + ch := make(chan []config.FileWatcherEvent) + go func() { + for evt := range inputCh { + ch <- []config.FileWatcherEvent{*evt} + } + }() + return ch }