mirror of https://github.com/hashicorp/consul
agent/proxy: Daemon works, tests cover it too
parent
e14fa850d8
commit
aa08a4cb46
|
@ -1,7 +1,11 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Daemon is a long-running proxy process. It is expected to keep running
|
||||
|
@ -17,8 +21,132 @@ type Daemon struct {
|
|||
// ProxyToken is the special local-only ACL token that allows a proxy
|
||||
// to communicate to the Connect-specific endpoints.
|
||||
ProxyToken string
|
||||
|
||||
// Logger is where logs will be sent around the management of this
|
||||
// daemon. The actual logs for the daemon itself will be sent to
|
||||
// a file.
|
||||
Logger *log.Logger
|
||||
|
||||
// process is the started process
|
||||
lock sync.Mutex
|
||||
process *os.Process
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
// Start starts the daemon and keeps it running.
|
||||
//
|
||||
// This function returns after the process is successfully started.
|
||||
func (p *Daemon) Start() error {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
// If the daemon is already started, return no error.
|
||||
if p.stopCh != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start it for the first time
|
||||
process, err := p.start()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the stop channel we use to notify when we've gracefully stopped
|
||||
stopCh := make(chan struct{})
|
||||
p.stopCh = stopCh
|
||||
|
||||
// Store the process so that we can signal it later
|
||||
p.process = process
|
||||
|
||||
go p.keepAlive(stopCh)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Daemon) keepAlive(stopCh chan struct{}) {
|
||||
p.lock.Lock()
|
||||
process := p.process
|
||||
p.lock.Unlock()
|
||||
|
||||
for {
|
||||
if process == nil {
|
||||
p.lock.Lock()
|
||||
|
||||
// If we gracefully stopped (stopCh is closed) then don't restart. We
|
||||
// check stopCh and not p.stopCh because the latter could reference
|
||||
// a new process.
|
||||
select {
|
||||
case <-stopCh:
|
||||
p.lock.Unlock()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Process isn't started currently. We're restarting.
|
||||
var err error
|
||||
process, err = p.start()
|
||||
if err != nil {
|
||||
p.Logger.Printf("[ERR] agent/proxy: error restarting daemon: %s", err)
|
||||
}
|
||||
|
||||
p.process = process
|
||||
p.lock.Unlock()
|
||||
}
|
||||
|
||||
_, err := process.Wait()
|
||||
process = nil
|
||||
p.Logger.Printf("[INFO] agent/proxy: daemon exited: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// start starts and returns the process. This will create a copy of the
|
||||
// configured *exec.Command with the modifications documented on Daemon
|
||||
// such as setting the proxy token environmental variable.
|
||||
func (p *Daemon) start() (*os.Process, error) {
|
||||
cmd := *p.Command
|
||||
|
||||
// Add the proxy token to the environment. We first copy the env because
|
||||
// it is a slice and therefore the "copy" above will only copy the slice
|
||||
// reference. We allocate an exactly sized slice.
|
||||
cmd.Env = make([]string, len(p.Command.Env), len(p.Command.Env)+1)
|
||||
copy(cmd.Env, p.Command.Env)
|
||||
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", EnvProxyToken, p.ProxyToken))
|
||||
|
||||
// Start it
|
||||
err := cmd.Start()
|
||||
return cmd.Process, err
|
||||
}
|
||||
|
||||
// Stop stops the daemon.
|
||||
//
|
||||
// This will attempt a graceful stop (SIGINT) before force killing the
|
||||
// process (SIGKILL). In either case, the process won't be automatically
|
||||
// restarted unless Start is called again.
|
||||
//
|
||||
// This is safe to call multiple times. If the daemon is already stopped,
|
||||
// then this returns no error.
|
||||
func (p *Daemon) Stop() error {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
// If we don't have a stopCh then we never even started yet.
|
||||
if p.stopCh == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// If stopCh is closed, then we're already stopped
|
||||
select {
|
||||
case <-p.stopCh:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
err := p.process.Signal(os.Interrupt)
|
||||
|
||||
// This signals that we've stopped and therefore don't want to restart
|
||||
close(p.stopCh)
|
||||
p.stopCh = nil
|
||||
|
||||
return err
|
||||
//return p.Command.Process.Kill()
|
||||
}
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/testutil/retry"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDaemonStartStop(t *testing.T) {
|
||||
require := require.New(t)
|
||||
td, closer := testTempDir(t)
|
||||
defer closer()
|
||||
|
||||
path := filepath.Join(td, "file")
|
||||
uuid, err := uuid.GenerateUUID()
|
||||
require.NoError(err)
|
||||
|
||||
d := &Daemon{
|
||||
Command: helperProcess("start-stop", path),
|
||||
ProxyToken: uuid,
|
||||
Logger: testLogger,
|
||||
}
|
||||
require.NoError(d.Start())
|
||||
|
||||
// Wait for the file to exist
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
_, err := os.Stat(path)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
r.Fatalf("error: %s", err)
|
||||
})
|
||||
|
||||
// Verify that the contents of the file is the token. This verifies
|
||||
// that we properly passed the token as an env var.
|
||||
data, err := ioutil.ReadFile(path)
|
||||
require.NoError(err)
|
||||
require.Equal(uuid, string(data))
|
||||
|
||||
// Stop the process
|
||||
require.NoError(d.Stop())
|
||||
|
||||
// File should no longer exist.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
_, err := os.Stat(path)
|
||||
if os.IsNotExist(err) {
|
||||
return
|
||||
}
|
||||
|
||||
// err might be nil here but that's okay
|
||||
r.Fatalf("should not exist: %s", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDaemonRestart(t *testing.T) {
|
||||
require := require.New(t)
|
||||
td, closer := testTempDir(t)
|
||||
defer closer()
|
||||
path := filepath.Join(td, "file")
|
||||
|
||||
d := &Daemon{
|
||||
Command: helperProcess("restart", path),
|
||||
Logger: testLogger,
|
||||
}
|
||||
require.NoError(d.Start())
|
||||
defer d.Stop()
|
||||
|
||||
// Wait for the file to exist. We save the func so we can reuse the test.
|
||||
waitFile := func() {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
_, err := os.Stat(path)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
r.Fatalf("error waiting for path: %s", err)
|
||||
})
|
||||
}
|
||||
waitFile()
|
||||
|
||||
// Delete the file
|
||||
require.NoError(os.Remove(path))
|
||||
|
||||
// File should re-appear because the process is restart
|
||||
waitFile()
|
||||
}
|
|
@ -0,0 +1,116 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"os/signal"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// testLogger is a logger that can be used by tests that require a
|
||||
// *log.Logger instance.
|
||||
var testLogger = log.New(os.Stderr, "logger: ", log.LstdFlags)
|
||||
|
||||
// testTempDir returns a temporary directory and a cleanup function.
|
||||
func testTempDir(t *testing.T) (string, func()) {
|
||||
t.Helper()
|
||||
|
||||
td, err := ioutil.TempDir("", "test-agent-proxy")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
return td, func() {
|
||||
if err := os.RemoveAll(td); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// helperProcess returns an *exec.Cmd that can be used to execute the
|
||||
// TestHelperProcess function below. This can be used to test multi-process
|
||||
// interactions.
|
||||
func helperProcess(s ...string) *exec.Cmd {
|
||||
cs := []string{"-test.run=TestHelperProcess", "--"}
|
||||
cs = append(cs, s...)
|
||||
env := []string{"GO_WANT_HELPER_PROCESS=1"}
|
||||
|
||||
cmd := exec.Command(os.Args[0], cs...)
|
||||
cmd.Env = append(env, os.Environ()...)
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
return cmd
|
||||
}
|
||||
|
||||
// This is not a real test. This is just a helper process kicked off by tests
|
||||
// using the helperProcess helper function.
|
||||
func TestHelperProcess(t *testing.T) {
|
||||
if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" {
|
||||
return
|
||||
}
|
||||
|
||||
defer os.Exit(0)
|
||||
|
||||
args := os.Args
|
||||
for len(args) > 0 {
|
||||
if args[0] == "--" {
|
||||
args = args[1:]
|
||||
break
|
||||
}
|
||||
|
||||
args = args[1:]
|
||||
}
|
||||
|
||||
if len(args) == 0 {
|
||||
fmt.Fprintf(os.Stderr, "No command\n")
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
cmd, args := args[0], args[1:]
|
||||
switch cmd {
|
||||
// While running, this creates a file in the given directory (args[0])
|
||||
// and deletes it only whe nit is stopped.
|
||||
case "start-stop":
|
||||
ch := make(chan os.Signal, 1)
|
||||
signal.Notify(ch, os.Interrupt)
|
||||
defer signal.Stop(ch)
|
||||
|
||||
path := args[0]
|
||||
data := []byte(os.Getenv(EnvProxyToken))
|
||||
|
||||
if err := ioutil.WriteFile(path, data, 0644); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
defer os.Remove(path)
|
||||
|
||||
<-ch
|
||||
|
||||
// Restart writes to a file and keeps running while that file still
|
||||
// exists. When that file is removed, this process exits. This can be
|
||||
// used to test restarting.
|
||||
case "restart":
|
||||
// Write the file
|
||||
path := args[0]
|
||||
if err := ioutil.WriteFile(path, []byte("hello"), 0644); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// While the file still exists, do nothing. When the file no longer
|
||||
// exists, we exit.
|
||||
for {
|
||||
time.Sleep(25 * time.Millisecond)
|
||||
if _, err := os.Stat(path); os.IsNotExist(err) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, "Unknown command: %q\n", cmd)
|
||||
os.Exit(2)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue