Browse Source

extract coaelsce Timer and add a shim for testing

pull/12548/head
Dhia Ayachi 3 years ago
parent
commit
0ab86012a4
  1. 67
      agent/agent.go
  2. 119
      agent/agent_test.go

67
agent/agent.go

@ -370,6 +370,8 @@ type Agent struct {
// enterpriseAgent embeds fields that we only access in consul-enterprise builds
enterpriseAgent
coalesceTimerShim func(inputCh chan *config.FileWatcherEvent, coalesceDuration time.Duration) chan []config.FileWatcherEvent
}
// New process the desired options and creates a new Agent.
@ -470,6 +472,7 @@ func New(bd BaseDeps) (*Agent, error) {
a.configFileWatcher = w
}
a.coalesceTimerShim = coalesceTimer
return &a, nil
}
@ -730,33 +733,11 @@ func (a *Agent) Start(ctx context.Context) error {
a.baseDeps.Logger.Debug("starting file watcher")
a.FileWatcher.Start(context.Background())
go func() {
var coalesceTimer *time.Timer = nil
eventsCount := 0
sendCh := make(chan struct{})
for {
select {
case event, ok := <-a.FileWatcher.EventsCh:
if !ok {
return
}
a.baseDeps.Logger.Debug("auto-reload event received", "event-file", event.Filename)
if coalesceTimer == nil {
coalesceTimer = time.AfterFunc(1*time.Millisecond, func() {
// This runs in another goroutine so we can't just do the send
// directly here as access to snap is racy. Instead, signal the main
// loop above.
sendCh <- struct{}{}
})
}
eventsCount++
case <-sendCh:
a.baseDeps.Logger.Debug("auto-reload config triggered", "num-events", eventsCount)
coalesceTimer = nil
eventsCount = 0
err := a.ReloadConfig(true)
if err != nil {
a.baseDeps.Logger.Error("error loading config", "error", err)
}
for events := range a.coalesceTimerShim(a.FileWatcher.EventsCh, 1*time.Millisecond) {
a.baseDeps.Logger.Debug("auto-reload config triggered", "num-events", len(events))
err := a.ReloadConfig(true)
if err != nil {
a.baseDeps.Logger.Error("error loading config", "error", err)
}
}
}
@ -766,6 +747,38 @@ func (a *Agent) Start(ctx context.Context) error {
return nil
}
func coalesceTimer(inputCh chan *config.FileWatcherEvent, coalesceDuration time.Duration) chan []config.FileWatcherEvent {
var coalesceTimer *time.Timer = nil
sendCh := make(chan struct{})
FileWatcherEvents := make([]config.FileWatcherEvent, 0)
FileWatcherEventsCh := make(chan []config.FileWatcherEvent)
go func() {
for {
select {
case event, ok := <-inputCh:
if !ok {
close(FileWatcherEventsCh)
return
}
FileWatcherEvents = append(FileWatcherEvents, *event)
if coalesceTimer == nil {
coalesceTimer = time.AfterFunc(coalesceDuration, func() {
// This runs in another goroutine so we can't just do the send
// directly here as access to snap is racy. Instead, signal the main
// loop above.
sendCh <- struct{}{}
})
}
case <-sendCh:
coalesceTimer = nil
FileWatcherEventsCh <- FileWatcherEvents
FileWatcherEvents = make([]config.FileWatcherEvent, 0)
}
}
}()
return FileWatcherEventsCh
}
var Gauges = []prometheus.GaugeDefinition{
{
Name: []string{"version"},

119
agent/agent_test.go

@ -5650,7 +5650,7 @@ func TestAgent_AutoReloadDoReload_WhenKeyThenCertUpdated(t *testing.T) {
`), 0600))
srv := StartTestAgent(t, TestAgent{Name: "TestAgent-Server", HCL: hclConfig, configFiles: []string{configFile}})
srv.coalesceTimerShim = testCoalesceTimer
defer srv.Shutdown()
testrpc.WaitForTestAgent(t, srv.RPC, "dc1", testrpc.WithToken(TestDefaultInitialManagementToken))
@ -5735,113 +5735,12 @@ func TestAgent_AutoReloadDoReload_WhenKeyThenCertUpdated(t *testing.T) {
})
}
func Test_coalesceTimerTwoPeriods(t *testing.T) {
certsDir := testutil.TempDir(t, "auto-config")
// write some test TLS certificates out to the cfg dir
serverName := "server.dc1.consul"
signer, _, err := tlsutil.GeneratePrivateKey()
require.NoError(t, err)
ca, _, err := tlsutil.GenerateCA(tlsutil.CAOpts{Signer: signer})
require.NoError(t, err)
cert, privateKey, err := tlsutil.GenerateCert(tlsutil.CertOpts{
Signer: signer,
CA: ca,
Name: "Test Cert Name",
Days: 365,
DNSNames: []string{serverName},
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
})
require.NoError(t, err)
certFile := filepath.Join(certsDir, "cert.pem")
caFile := filepath.Join(certsDir, "cacert.pem")
keyFile := filepath.Join(certsDir, "key.pem")
require.NoError(t, ioutil.WriteFile(certFile, []byte(cert), 0600))
require.NoError(t, ioutil.WriteFile(caFile, []byte(ca), 0600))
require.NoError(t, ioutil.WriteFile(keyFile, []byte(privateKey), 0600))
// generate a gossip key
gossipKey := make([]byte, 32)
n, err := rand.Read(gossipKey)
require.NoError(t, err)
require.Equal(t, 32, n)
gossipKeyEncoded := base64.StdEncoding.EncodeToString(gossipKey)
hclConfig := TestACLConfigWithParams(nil)
configFile := testutil.TempDir(t, "config") + "/config.hcl"
require.NoError(t, ioutil.WriteFile(configFile, []byte(`
encrypt = "`+gossipKeyEncoded+`"
encrypt_verify_incoming = true
encrypt_verify_outgoing = true
verify_incoming = true
verify_outgoing = true
verify_server_hostname = true
ca_file = "`+caFile+`"
cert_file = "`+certFile+`"
key_file = "`+keyFile+`"
connect { enabled = true }
auto_reload_config = true
`), 0600))
coalesceInterval := 100 * time.Millisecond
testAgent := TestAgent{Name: "TestAgent-Server", HCL: hclConfig, configFiles: []string{configFile}, Config: &config.RuntimeConfig{
AutoReloadConfigCoalesceInterval: coalesceInterval,
}}
srv := StartTestAgent(t, testAgent)
defer srv.Shutdown()
testrpc.WaitForTestAgent(t, srv.RPC, "dc1", testrpc.WithToken(TestDefaultInitialManagementToken))
cert1Pub := srv.tlsConfigurator.Cert().Certificate
cert1Key := srv.tlsConfigurator.Cert().PrivateKey
certNew, privateKeyNew, err := tlsutil.GenerateCert(tlsutil.CertOpts{
Signer: signer,
CA: ca,
Name: "Test Cert Name",
Days: 365,
DNSNames: []string{serverName},
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
})
require.NoError(t, err)
certFileNew := filepath.Join(certsDir, "cert_new.pem")
require.NoError(t, ioutil.WriteFile(certFileNew, []byte(certNew), 0600))
require.NoError(t, ioutil.WriteFile(configFile, []byte(`
encrypt = "`+gossipKeyEncoded+`"
encrypt_verify_incoming = true
encrypt_verify_outgoing = true
verify_incoming = true
verify_outgoing = true
verify_server_hostname = true
ca_file = "`+caFile+`"
cert_file = "`+certFileNew+`"
key_file = "`+keyFile+`"
connect { enabled = true }
auto_reload_config = true
`), 0600))
// cert should not change as we did not update the associated key
time.Sleep(coalesceInterval * 2)
retry.Run(t, func(r *retry.R) {
cert := srv.tlsConfigurator.Cert()
require.NotNil(r, cert)
require.Equal(r, cert1Pub, cert.Certificate)
require.Equal(r, cert1Key, cert.PrivateKey)
})
require.NoError(t, ioutil.WriteFile(keyFile, []byte(privateKeyNew), 0600))
// cert should change as we did not update the associated key
time.Sleep(coalesceInterval * 2)
retry.Run(t, func(r *retry.R) {
require.NotEqual(r, cert1Pub, srv.tlsConfigurator.Cert().Certificate)
require.NotEqual(r, cert1Key, srv.tlsConfigurator.Cert().PrivateKey)
})
func testCoalesceTimer(inputCh chan *config.FileWatcherEvent, _ time.Duration) chan []config.FileWatcherEvent {
ch := make(chan []config.FileWatcherEvent)
go func() {
for evt := range inputCh {
ch <- []config.FileWatcherEvent{*evt}
}
}()
return ch
}

Loading…
Cancel
Save