diff --git a/agent/proxy/manager.go b/agent/proxy/manager.go index 05445d93aa..765e9f0220 100644 --- a/agent/proxy/manager.go +++ b/agent/proxy/manager.go @@ -173,7 +173,7 @@ func (m *Manager) Run() { // We wait for anything not running, just so we're more resilient // in the face of state machine issues. Basically any state change // will cause us to quit. - for m.runState != managerStateRunning { + for m.runState == managerStateRunning { m.cond.Wait() } }() @@ -240,7 +240,7 @@ func (m *Manager) sync() { delete(state, id) // TODO: diff and restart if necessary - println(stateProxy) + println("DIFF", id, stateProxy) } // Remaining entries in state are new proxies. Start them! diff --git a/agent/proxy/manager_test.go b/agent/proxy/manager_test.go index 13a57d7b74..eba7a674a8 100644 --- a/agent/proxy/manager_test.go +++ b/agent/proxy/manager_test.go @@ -5,6 +5,7 @@ import ( "os/exec" "path/filepath" "testing" + "time" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/structs" @@ -28,7 +29,7 @@ func TestManagerClose_noRun(t *testing.T) { func TestManagerRun_initialSync(t *testing.T) { t.Parallel() - state := testState(t) + state := local.TestState(t) m := NewManager() m.State = state defer m.Kill() @@ -37,7 +38,7 @@ func TestManagerRun_initialSync(t *testing.T) { td, closer := testTempDir(t) defer closer() path := filepath.Join(td, "file") - testStateProxy(t, state, helperProcess("restart", path)) + testStateProxy(t, state, "web", helperProcess("restart", path)) // Start the manager go m.Run() @@ -52,27 +53,103 @@ func TestManagerRun_initialSync(t *testing.T) { }) } -func testState(t *testing.T) *local.State { +func TestManagerRun_syncNew(t *testing.T) { + t.Parallel() + state := local.TestState(t) - require.NoError(t, state.AddService(&structs.NodeService{ - Service: "web", - }, "web")) + m := NewManager() + m.State = state + defer m.Kill() - return state + // Start the manager + go m.Run() + + // Sleep a bit, this is just an attempt for Run to already be running. + // Its not a big deal if this sleep doesn't happen (slow CI). + time.Sleep(100 * time.Millisecond) + + // Add the first proxy + td, closer := testTempDir(t) + defer closer() + path := filepath.Join(td, "file") + testStateProxy(t, state, "web", helperProcess("restart", path)) + + // We should see the path appear shortly + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path) + if err == nil { + return + } + r.Fatalf("error waiting for path: %s", err) + }) + + // Add another proxy + path = path + "2" + testStateProxy(t, state, "db", helperProcess("restart", path)) + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path) + if err == nil { + return + } + r.Fatalf("error waiting for path: %s", err) + }) +} + +func TestManagerRun_syncDelete(t *testing.T) { + t.Parallel() + + state := local.TestState(t) + m := NewManager() + m.State = state + defer m.Kill() + + // Start the manager + go m.Run() + + // Add the first proxy + td, closer := testTempDir(t) + defer closer() + path := filepath.Join(td, "file") + id := testStateProxy(t, state, "web", helperProcess("restart", path)) + + // We should see the path appear shortly + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path) + if err == nil { + return + } + r.Fatalf("error waiting for path: %s", err) + }) + + // Remove the proxy + _, err := state.RemoveProxy(id) + require.NoError(t, err) + + // File should disappear as process is killed + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path) + if err == nil { + r.Fatalf("path exists") + } + }) } // testStateProxy registers a proxy with the given local state and the command // (expected to be from the helperProcess function call). It returns the // ID for deregistration. -func testStateProxy(t *testing.T, state *local.State, cmd *exec.Cmd) string { +func testStateProxy(t *testing.T, state *local.State, service string, cmd *exec.Cmd) string { command := []string{cmd.Path} command = append(command, cmd.Args...) + require.NoError(t, state.AddService(&structs.NodeService{ + Service: service, + }, "token")) + p, err := state.AddProxy(&structs.ConnectManagedProxy{ ExecMode: structs.ProxyExecModeDaemon, Command: command, - TargetServiceID: "web", - }, "web") + TargetServiceID: service, + }, "token") require.NoError(t, err) return p.Proxy.ProxyService.ID diff --git a/agent/proxy/proxy_test.go b/agent/proxy/proxy_test.go index 1d6df99ef5..11994b1bfd 100644 --- a/agent/proxy/proxy_test.go +++ b/agent/proxy/proxy_test.go @@ -91,6 +91,10 @@ func TestHelperProcess(t *testing.T) { // exists. When that file is removed, this process exits. This can be // used to test restarting. case "restart": + ch := make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt) + defer signal.Stop(ch) + // Write the file path := args[0] if err := ioutil.WriteFile(path, []byte("hello"), 0644); err != nil { @@ -105,6 +109,15 @@ func TestHelperProcess(t *testing.T) { if _, err := os.Stat(path); os.IsNotExist(err) { break } + + select { + case <-ch: + // We received an interrupt, clean exit + os.Remove(path) + break + + default: + } } default: