From aa08a4cb46a003b0c33aeb27b29cfe9201584f2d Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Wed, 25 Apr 2018 16:54:00 -0700 Subject: [PATCH] agent/proxy: Daemon works, tests cover it too --- agent/proxy/daemon.go | 128 +++++++++++++++++++++++++++++++++++++ agent/proxy/daemon_test.go | 91 ++++++++++++++++++++++++++ agent/proxy/proxy_test.go | 116 +++++++++++++++++++++++++++++++++ 3 files changed, 335 insertions(+) create mode 100644 agent/proxy/daemon_test.go create mode 100644 agent/proxy/proxy_test.go diff --git a/agent/proxy/daemon.go b/agent/proxy/daemon.go index f432271b6d..74fa62d440 100644 --- a/agent/proxy/daemon.go +++ b/agent/proxy/daemon.go @@ -1,7 +1,11 @@ package proxy import ( + "fmt" + "log" + "os" "os/exec" + "sync" ) // Daemon is a long-running proxy process. It is expected to keep running @@ -17,8 +21,132 @@ type Daemon struct { // ProxyToken is the special local-only ACL token that allows a proxy // to communicate to the Connect-specific endpoints. ProxyToken string + + // Logger is where logs will be sent around the management of this + // daemon. The actual logs for the daemon itself will be sent to + // a file. + Logger *log.Logger + + // process is the started process + lock sync.Mutex + process *os.Process + stopCh chan struct{} } // Start starts the daemon and keeps it running. +// +// This function returns after the process is successfully started. func (p *Daemon) Start() error { + p.lock.Lock() + defer p.lock.Unlock() + + // If the daemon is already started, return no error. + if p.stopCh != nil { + return nil + } + + // Start it for the first time + process, err := p.start() + if err != nil { + return err + } + + // Create the stop channel we use to notify when we've gracefully stopped + stopCh := make(chan struct{}) + p.stopCh = stopCh + + // Store the process so that we can signal it later + p.process = process + + go p.keepAlive(stopCh) + + return nil +} + +func (p *Daemon) keepAlive(stopCh chan struct{}) { + p.lock.Lock() + process := p.process + p.lock.Unlock() + + for { + if process == nil { + p.lock.Lock() + + // If we gracefully stopped (stopCh is closed) then don't restart. We + // check stopCh and not p.stopCh because the latter could reference + // a new process. + select { + case <-stopCh: + p.lock.Unlock() + return + default: + } + + // Process isn't started currently. We're restarting. + var err error + process, err = p.start() + if err != nil { + p.Logger.Printf("[ERR] agent/proxy: error restarting daemon: %s", err) + } + + p.process = process + p.lock.Unlock() + } + + _, err := process.Wait() + process = nil + p.Logger.Printf("[INFO] agent/proxy: daemon exited: %s", err) + } +} + +// start starts and returns the process. This will create a copy of the +// configured *exec.Command with the modifications documented on Daemon +// such as setting the proxy token environmental variable. +func (p *Daemon) start() (*os.Process, error) { + cmd := *p.Command + + // Add the proxy token to the environment. We first copy the env because + // it is a slice and therefore the "copy" above will only copy the slice + // reference. We allocate an exactly sized slice. + cmd.Env = make([]string, len(p.Command.Env), len(p.Command.Env)+1) + copy(cmd.Env, p.Command.Env) + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", EnvProxyToken, p.ProxyToken)) + + // Start it + err := cmd.Start() + return cmd.Process, err +} + +// Stop stops the daemon. +// +// This will attempt a graceful stop (SIGINT) before force killing the +// process (SIGKILL). In either case, the process won't be automatically +// restarted unless Start is called again. +// +// This is safe to call multiple times. If the daemon is already stopped, +// then this returns no error. +func (p *Daemon) Stop() error { + p.lock.Lock() + defer p.lock.Unlock() + + // If we don't have a stopCh then we never even started yet. + if p.stopCh == nil { + return nil + } + + // If stopCh is closed, then we're already stopped + select { + case <-p.stopCh: + return nil + default: + } + + err := p.process.Signal(os.Interrupt) + + // This signals that we've stopped and therefore don't want to restart + close(p.stopCh) + p.stopCh = nil + + return err + //return p.Command.Process.Kill() } diff --git a/agent/proxy/daemon_test.go b/agent/proxy/daemon_test.go new file mode 100644 index 0000000000..0af971b931 --- /dev/null +++ b/agent/proxy/daemon_test.go @@ -0,0 +1,91 @@ +package proxy + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/hashicorp/consul/testutil/retry" + "github.com/hashicorp/go-uuid" + "github.com/stretchr/testify/require" +) + +func TestDaemonStartStop(t *testing.T) { + require := require.New(t) + td, closer := testTempDir(t) + defer closer() + + path := filepath.Join(td, "file") + uuid, err := uuid.GenerateUUID() + require.NoError(err) + + d := &Daemon{ + Command: helperProcess("start-stop", path), + ProxyToken: uuid, + Logger: testLogger, + } + require.NoError(d.Start()) + + // Wait for the file to exist + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path) + if err == nil { + return + } + + r.Fatalf("error: %s", err) + }) + + // Verify that the contents of the file is the token. This verifies + // that we properly passed the token as an env var. + data, err := ioutil.ReadFile(path) + require.NoError(err) + require.Equal(uuid, string(data)) + + // Stop the process + require.NoError(d.Stop()) + + // File should no longer exist. + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path) + if os.IsNotExist(err) { + return + } + + // err might be nil here but that's okay + r.Fatalf("should not exist: %s", err) + }) +} + +func TestDaemonRestart(t *testing.T) { + require := require.New(t) + td, closer := testTempDir(t) + defer closer() + path := filepath.Join(td, "file") + + d := &Daemon{ + Command: helperProcess("restart", path), + Logger: testLogger, + } + require.NoError(d.Start()) + defer d.Stop() + + // Wait for the file to exist. We save the func so we can reuse the test. + waitFile := func() { + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path) + if err == nil { + return + } + r.Fatalf("error waiting for path: %s", err) + }) + } + waitFile() + + // Delete the file + require.NoError(os.Remove(path)) + + // File should re-appear because the process is restart + waitFile() +} diff --git a/agent/proxy/proxy_test.go b/agent/proxy/proxy_test.go new file mode 100644 index 0000000000..fa8eef1283 --- /dev/null +++ b/agent/proxy/proxy_test.go @@ -0,0 +1,116 @@ +package proxy + +import ( + "fmt" + "io/ioutil" + "log" + "os" + "os/exec" + "os/signal" + "testing" + "time" +) + +// testLogger is a logger that can be used by tests that require a +// *log.Logger instance. +var testLogger = log.New(os.Stderr, "logger: ", log.LstdFlags) + +// testTempDir returns a temporary directory and a cleanup function. +func testTempDir(t *testing.T) (string, func()) { + t.Helper() + + td, err := ioutil.TempDir("", "test-agent-proxy") + if err != nil { + t.Fatalf("err: %s", err) + } + + return td, func() { + if err := os.RemoveAll(td); err != nil { + t.Fatalf("err: %s", err) + } + } +} + +// helperProcess returns an *exec.Cmd that can be used to execute the +// TestHelperProcess function below. This can be used to test multi-process +// interactions. +func helperProcess(s ...string) *exec.Cmd { + cs := []string{"-test.run=TestHelperProcess", "--"} + cs = append(cs, s...) + env := []string{"GO_WANT_HELPER_PROCESS=1"} + + cmd := exec.Command(os.Args[0], cs...) + cmd.Env = append(env, os.Environ()...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + return cmd +} + +// This is not a real test. This is just a helper process kicked off by tests +// using the helperProcess helper function. +func TestHelperProcess(t *testing.T) { + if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" { + return + } + + defer os.Exit(0) + + args := os.Args + for len(args) > 0 { + if args[0] == "--" { + args = args[1:] + break + } + + args = args[1:] + } + + if len(args) == 0 { + fmt.Fprintf(os.Stderr, "No command\n") + os.Exit(2) + } + + cmd, args := args[0], args[1:] + switch cmd { + // While running, this creates a file in the given directory (args[0]) + // and deletes it only whe nit is stopped. + case "start-stop": + ch := make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt) + defer signal.Stop(ch) + + path := args[0] + data := []byte(os.Getenv(EnvProxyToken)) + + if err := ioutil.WriteFile(path, data, 0644); err != nil { + t.Fatalf("err: %s", err) + } + defer os.Remove(path) + + <-ch + + // Restart writes to a file and keeps running while that file still + // exists. When that file is removed, this process exits. This can be + // used to test restarting. + case "restart": + // Write the file + path := args[0] + if err := ioutil.WriteFile(path, []byte("hello"), 0644); err != nil { + fmt.Fprintf(os.Stderr, "Error: %s\n", err) + os.Exit(1) + } + + // While the file still exists, do nothing. When the file no longer + // exists, we exit. + for { + time.Sleep(25 * time.Millisecond) + if _, err := os.Stat(path); os.IsNotExist(err) { + break + } + } + + default: + fmt.Fprintf(os.Stderr, "Unknown command: %q\n", cmd) + os.Exit(2) + } +}