Browse Source

refactor & add better retry logic to NewTestAgent (#6363)

Fixes #6361
pull/6442/head
Sarah Adams 5 years ago committed by GitHub
parent
commit
4ed5515fca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      agent/agent_endpoint_test.go
  2. 44
      agent/agent_test.go
  3. 12
      agent/http_test.go
  4. 24
      agent/keyring_test.go
  5. 36
      agent/local/state_test.go
  6. 195
      agent/testagent.go
  7. 4
      command/monitor/monitor_test.go
  8. 2
      vendor/modules.txt

4
agent/agent_endpoint_test.go

@ -3925,7 +3925,9 @@ func TestAgent_Monitor(t *testing.T) {
LogOutput: io.MultiWriter(os.Stderr, logWriter),
HCL: `node_name = "invalid!"`,
}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")

44
agent/agent_test.go

@ -97,7 +97,9 @@ func TestAgent_ConnectClusterIDConfig(t *testing.T) {
testFn := func() {
a := &TestAgent{Name: "test", HCL: tt.hcl}
a.ExpectConfigError = tt.wantPanic
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
cfg := a.consulConfig()
@ -1221,7 +1223,9 @@ func TestAgent_RestoreServiceWithAliasCheck(t *testing.T) {
`
a := &TestAgent{Name: t.Name(), HCL: cfg, DataDir: dataDir}
a.LogOutput = testutil.TestWriter(t)
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dataDir)
defer a.Shutdown()
@ -1307,7 +1311,9 @@ node_name = "` + a.Config.NodeName + `"
// Reload and retain former NodeID and data directory.
a2 := &TestAgent{Name: t.Name(), HCL: futureHCL, DataDir: dataDir}
a2.LogOutput = testutil.TestWriter(t)
a2.Start(t)
if err := a2.Start(); err != nil {
t.Fatal(err)
}
defer a2.Shutdown()
a = nil
@ -1589,7 +1595,9 @@ func TestAgent_HTTPCheck_EnableAgentTLSForChecks(t *testing.T) {
cert_file = "../test/client_certs/server.crt"
` + ca,
}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
health := &structs.HealthCheck{
@ -1695,7 +1703,9 @@ func TestAgent_PersistService(t *testing.T) {
data_dir = "` + dataDir + `"
`
a := &TestAgent{Name: t.Name(), HCL: cfg, DataDir: dataDir}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dataDir)
defer a.Shutdown()
@ -1761,7 +1771,9 @@ func TestAgent_PersistService(t *testing.T) {
// Should load it back during later start
a2 := &TestAgent{Name: t.Name(), HCL: cfg, DataDir: dataDir}
a2.Start(t)
if err := a2.Start(); err != nil {
t.Fatal(err)
}
defer a2.Shutdown()
restored := a2.State.ServiceState(svc.ID)
@ -1868,7 +1880,9 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) {
bootstrap = false
`
a := &TestAgent{Name: t.Name(), HCL: cfg, DataDir: dataDir}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
defer os.RemoveAll(dataDir)
@ -1895,7 +1909,9 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) {
port = 9000
}
`, DataDir: dataDir}
a2.Start(t)
if err := a2.Start(); err != nil {
t.Fatal(err)
}
defer a2.Shutdown()
file := filepath.Join(a.Config.DataDir, servicesDir, stringHash(svc1.ID))
@ -1921,7 +1937,9 @@ func TestAgent_PersistCheck(t *testing.T) {
enable_script_checks = true
`
a := &TestAgent{Name: t.Name(), HCL: cfg, DataDir: dataDir}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dataDir)
defer a.Shutdown()
@ -1993,7 +2011,9 @@ func TestAgent_PersistCheck(t *testing.T) {
// Should load it back during later start
a2 := &TestAgent{Name: t.Name() + "-a2", HCL: cfg, DataDir: dataDir}
a2.Start(t)
if err := a2.Start(); err != nil {
t.Fatal(err)
}
defer a2.Shutdown()
result := a2.State.Check(check.CheckID)
@ -3024,7 +3044,9 @@ func TestAgent_reloadWatches(t *testing.T) {
func TestAgent_reloadWatchesHTTPS(t *testing.T) {
t.Parallel()
a := TestAgent{Name: t.Name(), UseTLS: true}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
// Normal watch with http addr set, should succeed

12
agent/http_test.go

@ -140,7 +140,9 @@ func TestHTTPServer_H2(t *testing.T) {
ca_file = "../test/client_certs/rootca.crt"
`,
}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
// Make an HTTP/2-enabled client, using the API helpers to set
@ -457,7 +459,9 @@ func TestHTTP_wrap_obfuscateLog(t *testing.T) {
t.Parallel()
buf := new(bytes.Buffer)
a := &TestAgent{Name: t.Name(), LogOutput: buf}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
handler := func(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
@ -1192,7 +1196,9 @@ func TestAllowedNets(t *testing.T) {
a := &TestAgent{
Name: t.Name(),
}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")

24
agent/keyring_test.go

@ -55,7 +55,9 @@ func TestAgent_LoadKeyrings(t *testing.T) {
// Server should auto-load LAN and WAN keyring files
t.Run("server with keys", func(t *testing.T) {
a2 := &TestAgent{Name: t.Name(), Key: key}
a2.Start(t)
if err := a2.Start(); err != nil {
t.Fatal(err)
}
defer a2.Shutdown()
c2 := a2.consulConfig()
@ -85,7 +87,9 @@ func TestAgent_LoadKeyrings(t *testing.T) {
server = false
bootstrap = false
`, Key: key}
a3.Start(t)
if err := a3.Start(); err != nil {
t.Fatal(err)
}
defer a3.Shutdown()
c3 := a3.consulConfig()
@ -137,7 +141,9 @@ func TestAgent_InmemKeyrings(t *testing.T) {
encrypt = "` + key + `"
disable_keyring_file = true
`}
a2.Start(t)
if err := a2.Start(); err != nil {
t.Fatal(err)
}
defer a2.Shutdown()
c2 := a2.consulConfig()
@ -169,7 +175,9 @@ func TestAgent_InmemKeyrings(t *testing.T) {
bootstrap = false
disable_keyring_file = true
`}
a3.Start(t)
if err := a3.Start(); err != nil {
t.Fatal(err)
}
defer a3.Shutdown()
c3 := a3.consulConfig()
@ -208,7 +216,9 @@ func TestAgent_InmemKeyrings(t *testing.T) {
disable_keyring_file = true
data_dir = "` + dir + `"
`}
a4.Start(t)
if err := a4.Start(); err != nil {
t.Fatal(err)
}
defer a4.Shutdown()
c4 := a4.consulConfig()
@ -282,7 +292,9 @@ func TestAgentKeyring_ACL(t *testing.T) {
acl_master_token = "root"
acl_default_policy = "deny"
`, Key: key1}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
// List keys without access fails

36
agent/local/state_test.go

@ -26,7 +26,9 @@ import (
func TestAgentAntiEntropy_Services(t *testing.T) {
t.Parallel()
a := &agent.TestAgent{Name: t.Name()}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -259,7 +261,9 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
assert := assert.New(t)
a := &agent.TestAgent{Name: t.Name()}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -417,7 +421,9 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
func TestAgent_ServiceWatchCh(t *testing.T) {
t.Parallel()
a := &agent.TestAgent{Name: t.Name()}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -502,7 +508,9 @@ func TestAgent_ServiceWatchCh(t *testing.T) {
func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
t.Parallel()
a := &agent.TestAgent{Name: t.Name()}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -767,7 +775,9 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
acl_master_token = "root"
acl_default_policy = "deny"
acl_enforce_version_8 = true`}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
@ -914,7 +924,9 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
func TestAgentAntiEntropy_Checks(t *testing.T) {
t.Parallel()
a := &agent.TestAgent{Name: t.Name()}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -1113,7 +1125,9 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
acl_master_token = "root"
acl_default_policy = "deny"
acl_enforce_version_8 = true`}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, dc)
@ -1381,7 +1395,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
a := &agent.TestAgent{Name: t.Name(), HCL: `
check_update_interval = "500ms"
`}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
@ -1589,7 +1605,9 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
node_meta {
somekey = "somevalue"
}`}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")

195
agent/testagent.go

@ -6,7 +6,6 @@ import (
"io/ioutil"
"log"
"math/rand"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
@ -27,8 +26,6 @@ import (
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/require"
)
func init() {
@ -98,11 +95,18 @@ type TestAgent struct {
// caller should call Shutdown() to stop the agent and remove temporary
// directories.
func NewTestAgent(t *testing.T, name string, hcl string) *TestAgent {
a := &TestAgent{Name: name, HCL: hcl}
a.Start(t)
a := &TestAgent{Name: name, HCL: hcl, LogOutput: testutil.TestWriter(t)}
retry.RunWith(retry.ThreeTimes(), t, func(r *retry.R) {
if err := a.Start(); err != nil {
r.Fatal(err)
}
})
return a
}
// TODO: testing.T should be removed as a parameter, as it is not being used.
func NewUnstartedAgent(t *testing.T, name string, hcl string) (*Agent, error) {
c := TestConfig(config.Source{Name: name, Format: "hcl", Data: hcl})
a, err := New(c, nil)
@ -112,10 +116,24 @@ func NewUnstartedAgent(t *testing.T, name string, hcl string) (*Agent, error) {
return a, nil
}
// Start starts a test agent. It fails the test if the agent could not be started.
func (a *TestAgent) Start(t *testing.T) *TestAgent {
require := require.New(t)
require.Nil(a.Agent, "TestAgent already started")
// Start starts a test agent. It returns an error if the agent could not be started.
// If no error is returned, the caller must call Shutdown() when finished.
func (a *TestAgent) Start() (err error) {
if a.Agent != nil {
return fmt.Errorf("TestAgent already started")
}
var cleanupTmpDir = func() {
// Clean out the data dir if we are responsible for it before we
// try again, since the old ports may have gotten written to
// the data dir, such as in the Raft configuration.
if a.DataDir != "" {
if err := os.RemoveAll(a.DataDir); err != nil {
fmt.Printf("%s Error resetting data dir: %s", a.Name, err)
}
}
}
var hclDataDir string
if a.DataDir == "" {
name := "agent"
@ -124,79 +142,107 @@ func (a *TestAgent) Start(t *testing.T) *TestAgent {
}
name = strings.Replace(name, "/", "_", -1)
d, err := ioutil.TempDir(TempDir, name)
require.NoError(err, fmt.Sprintf("Error creating data dir %s: %s", filepath.Join(TempDir, name), err))
if err != nil {
return fmt.Errorf("Error creating data dir %s: %s", filepath.Join(TempDir, name), err)
}
hclDataDir = `data_dir = "` + d + `"`
}
var id string
for i := 10; i >= 0; i-- {
a.Config = TestConfig(
randomPortsSource(a.UseTLS),
config.Source{Name: a.Name, Format: "hcl", Data: a.HCL},
config.Source{Name: a.Name + ".data_dir", Format: "hcl", Data: hclDataDir},
)
id = string(a.Config.NodeID)
// write the keyring
if a.Key != "" {
writeKey := func(key, filename string) {
path := filepath.Join(a.Config.DataDir, filename)
err := initKeyring(path, key)
require.NoError(err, fmt.Sprintf("Error creating keyring %s: %s", path, err))
a.Config = TestConfig(
randomPortsSource(a.UseTLS),
config.Source{Name: a.Name, Format: "hcl", Data: a.HCL},
config.Source{Name: a.Name + ".data_dir", Format: "hcl", Data: hclDataDir},
)
// write the keyring
if a.Key != "" {
writeKey := func(key, filename string) error {
path := filepath.Join(a.Config.DataDir, filename)
if err := initKeyring(path, key); err != nil {
cleanupTmpDir()
return fmt.Errorf("Error creating keyring %s: %s", path, err)
}
writeKey(a.Key, SerfLANKeyring)
writeKey(a.Key, SerfWANKeyring)
return nil
}
logOutput := a.LogOutput
if logOutput == nil {
logOutput = testutil.TestWriter(t)
if err = writeKey(a.Key, SerfLANKeyring); err != nil {
cleanupTmpDir()
return err
}
if err = writeKey(a.Key, SerfWANKeyring); err != nil {
cleanupTmpDir()
return err
}
agentLogger := log.New(logOutput, a.Name+" - ", log.LstdFlags|log.Lmicroseconds)
agent, err := New(a.Config, agentLogger)
require.NoError(err, fmt.Sprintf("Error creating agent: %s", err))
agent.LogOutput = logOutput
agent.LogWriter = a.LogWriter
agent.MemSink = metrics.NewInmemSink(1*time.Second, time.Minute)
// we need the err var in the next exit condition
if err := agent.Start(); err == nil {
a.Agent = agent
break
} else if i == 0 {
require.Failf("%s %s Error starting agent: %s", id, a.Name, err)
} else if a.ExpectConfigError {
}
logOutput := a.LogOutput
if logOutput == nil {
// TODO: move this out of Start() and back into NewTestAgent,
// and make `logOutput = testutil.TestWriter(t)`
logOutput = os.Stderr
}
agentLogger := log.New(logOutput, a.Name+" - ", log.LstdFlags|log.Lmicroseconds)
agent, err := New(a.Config, agentLogger)
if err != nil {
cleanupTmpDir()
return fmt.Errorf("Error creating agent: %s", err)
}
agent.LogOutput = logOutput
agent.LogWriter = a.LogWriter
agent.MemSink = metrics.NewInmemSink(1*time.Second, time.Minute)
id := string(a.Config.NodeID)
if err := agent.Start(); err != nil {
cleanupTmpDir()
agent.ShutdownAgent()
agent.ShutdownEndpoints()
if a.ExpectConfigError {
// Panic the error since this can be caught if needed. Pretty gross way to
// detect errors but enough for now and this is a tiny edge case that I'd
// otherwise not have a way to test at all...
//
// TODO(sadams): This can be refactored away by returning an
// error here instead of panicing, removing the `ExpectConfigError`
// field from `TestAgent`, and having the test that uses this
// (TestAgent_ConnectClusterIDConfig) check for an error instead of
// catching a panic.
panic(err)
} else {
agent.ShutdownAgent()
agent.ShutdownEndpoints()
wait := time.Duration(rand.Int31n(2000)) * time.Millisecond
fmt.Println(id, a.Name, "retrying in", wait)
time.Sleep(wait)
}
// Clean out the data dir if we are responsible for it before we
// try again, since the old ports may have gotten written to
// the data dir, such as in the Raft configuration.
if a.DataDir != "" {
if err := os.RemoveAll(a.DataDir); err != nil {
require.Fail("%s %s Error resetting data dir: %s", id, a.Name, err)
}
}
return fmt.Errorf("%s %s Error starting agent: %s", id, a.Name, err)
}
a.Agent = agent
// Start the anti-entropy syncer
a.Agent.StartSync()
if err := a.waitForUp(); err != nil {
cleanupTmpDir()
a.Shutdown()
return err
}
a.dns = a.dnsServers[0]
a.srv = a.httpServers[0]
return nil
}
// waitForUp waits for leader election, or waits for the agent HTTP
// endpoint to start responding, depending on the agent config.
func (a *TestAgent) waitForUp() error {
timer := retry.TwoSeconds()
deadline := time.Now().Add(timer.Timeout)
var retErr error
var out structs.IndexedNodes
retry.Run(t, func(r *retry.R) {
for ; !time.Now().After(deadline); time.Sleep(timer.Wait) {
if len(a.httpServers) == 0 {
r.Fatal(a.Name, "waiting for server")
retErr = fmt.Errorf("%s: waiting for server", a.Name)
continue // fail, try again
}
if a.Config.Bootstrap && a.Config.ServerMode {
// Ensure we have a leader and a node registration.
@ -208,26 +254,31 @@ func (a *TestAgent) Start(t *testing.T) *TestAgent {
},
}
if err := a.RPC("Catalog.ListNodes", args, &out); err != nil {
r.Fatal(a.Name, "Catalog.ListNodes failed:", err)
retErr = fmt.Errorf("Catalog.ListNodes failed: %v", err)
continue // fail, try again
}
if !out.QueryMeta.KnownLeader {
r.Fatal(a.Name, "No leader")
retErr = fmt.Errorf("%s: No leader", a.Name)
continue // fail, try again
}
if out.Index == 0 {
r.Fatal(a.Name, ": Consul index is 0")
retErr = fmt.Errorf("%s: Consul index is 0", a.Name)
continue // fail, try again
}
return nil // success
} else {
req, _ := http.NewRequest("GET", "/v1/agent/self", nil)
req := httptest.NewRequest("GET", "/v1/agent/self", nil)
resp := httptest.NewRecorder()
_, err := a.httpServers[0].AgentSelf(resp, req)
if err != nil || resp.Code != 200 {
r.Fatal(a.Name, "failed OK response", err)
retErr = fmt.Errorf("%s: failed OK response: %v", a.Name, err)
continue
}
return nil // success
}
})
a.dns = a.dnsServers[0]
a.srv = a.httpServers[0]
return a
}
return fmt.Errorf("unavailable. last error: %v", retErr)
}
// Shutdown stops the agent and removes the data directory if it is

4
command/monitor/monitor_test.go

@ -20,7 +20,9 @@ func TestMonitorCommand_exitsOnSignalBeforeLinesArrive(t *testing.T) {
LogWriter: logWriter,
LogOutput: io.MultiWriter(os.Stderr, logWriter),
}
a.Start(t)
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
shutdownCh := make(chan struct{})

2
vendor/modules.txt vendored

@ -456,8 +456,8 @@ github.com/spf13/pflag
# github.com/stretchr/objx v0.1.1
github.com/stretchr/objx
# github.com/stretchr/testify v1.3.0
github.com/stretchr/testify/require
github.com/stretchr/testify/mock
github.com/stretchr/testify/require
github.com/stretchr/testify/assert
# github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926
github.com/tv42/httpunix

Loading…
Cancel
Save