diff --git a/agent/agent.go b/agent/agent.go index f70c16379a..365a76af0c 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -2038,58 +2038,38 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool) error // Lookup the target service token in state if there is one. token := a.State.ServiceToken(proxy.TargetServiceID) - // Determine if we need to default the command - if proxy.ExecMode == structs.ProxyExecModeDaemon && len(proxy.Command) == 0 { - // We use the globally configured default command. If it is empty - // then we need to determine the subcommand for this agent. - cmd := a.config.ConnectProxyDefaultDaemonCommand - if len(cmd) == 0 { - var err error - cmd, err = a.defaultProxyCommand() - if err != nil { - return err + /* + // Determine if we need to default the command + if proxy.ExecMode == structs.ProxyExecModeDaemon && len(proxy.Command) == 0 { + // We use the globally configured default command. If it is empty + // then we need to determine the subcommand for this agent. + cmd := a.config.ConnectProxyDefaultDaemonCommand + if len(cmd) == 0 { + var err error + cmd, err = a.defaultProxyCommand() + if err != nil { + return err + } } - } - proxy.CommandDefault = cmd - } + proxy.CommandDefault = cmd + } + */ // Add the proxy to local state first since we may need to assign a port which // needs to be coordinate under state lock. AddProxy will generate the // NodeService for the proxy populated with the allocated (or configured) port // and an ID, but it doesn't add it to the agent directly since that could // deadlock and we may need to coordinate adding it and persisting etc. - proxyState, oldProxy, err := a.State.AddProxy(proxy, token) + proxyState, err := a.State.AddProxy(proxy, token) if err != nil { return err } proxyService := proxyState.Proxy.ProxyService - // If we replaced an existing proxy, stop that process. - if oldProxy != nil { - if err := oldProxy.ProxyProcess.Stop(); err != nil { - a.logger.Printf( - "[ERR] error stopping managed proxy, may still be running: %s", - err) - } - } - - // Start the proxy process - if err := proxyState.ProxyProcess.Start(); err != nil { - a.State.RemoveProxy(proxyService.ID) - return fmt.Errorf("error starting managed proxy: %s", err) - } - // TODO(banks): register proxy health checks. err = a.AddService(proxyService, nil, persist, token) if err != nil { - // Stop the proxy process if it was started - if err := proxyState.ProxyProcess.Stop(); err != nil { - a.logger.Printf( - "[ERR] error stopping managed proxy, may still be running: %s", - err) - } - // Remove the state too a.State.RemoveProxy(proxyService.ID) return err @@ -2107,18 +2087,10 @@ func (a *Agent) RemoveProxy(proxyID string, persist bool) error { } // Remove the proxy from the local state - proxyState, err := a.State.RemoveProxy(proxyID) - if err != nil { + if _, err := a.State.RemoveProxy(proxyID); err != nil { return err } - // Stop the process. The proxy implementation is expected to perform - // retries so if this fails then retries have already been performed and - // the most we can do is just error. - if err := proxyState.ProxyProcess.Stop(); err != nil { - return fmt.Errorf("error stopping managed proxy process: %s", err) - } - // TODO(banks): unpersist proxy return nil diff --git a/agent/local/testing.go b/agent/local/testing.go new file mode 100644 index 0000000000..6ca9d12aea --- /dev/null +++ b/agent/local/testing.go @@ -0,0 +1,19 @@ +package local + +import ( + "log" + "os" + + "github.com/hashicorp/consul/agent/token" + "github.com/mitchellh/go-testing-interface" +) + +// TestState returns a configured *State for testing. +func TestState(t testing.T) *State { + result := NewState(Config{ + ProxyBindMinPort: 20000, + ProxyBindMaxPort: 20500, + }, log.New(os.Stderr, "", log.LstdFlags), &token.Store{}) + result.TriggerSyncChanges = func() {} + return result +} diff --git a/agent/proxy/daemon_test.go b/agent/proxy/daemon_test.go index dbd2099bcc..22948bdaf0 100644 --- a/agent/proxy/daemon_test.go +++ b/agent/proxy/daemon_test.go @@ -16,6 +16,8 @@ func TestDaemon_impl(t *testing.T) { } func TestDaemonStartStop(t *testing.T) { + t.Parallel() + require := require.New(t) td, closer := testTempDir(t) defer closer() @@ -63,6 +65,8 @@ func TestDaemonStartStop(t *testing.T) { } func TestDaemonRestart(t *testing.T) { + t.Parallel() + require := require.New(t) td, closer := testTempDir(t) defer closer() diff --git a/agent/proxy/manager.go b/agent/proxy/manager.go new file mode 100644 index 0000000000..05445d93aa --- /dev/null +++ b/agent/proxy/manager.go @@ -0,0 +1,300 @@ +package proxy + +import ( + "fmt" + "log" + "os" + "os/exec" + "sync" + + "github.com/hashicorp/consul/agent/local" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/go-multierror" +) + +// Manager starts, stops, snapshots, and restores managed proxies. +// +// The manager will not start or stop any processes until Start is called. +// Prior to this, any configuration, snapshot loading, etc. can be done. +// Even if a process is no longer running after loading the snapshot, it +// will not be restarted until Start is called. +// +// The Manager works by subscribing to change notifications on a local.State +// structure. Whenever a change is detected, the Manager syncs its internal +// state with the local.State and starts/stops any necessary proxies. The +// manager never holds a lock on local.State (except to read the proxies) +// and state updates may occur while the Manger is syncing. This is okay, +// since a change notification will be queued to trigger another sync. +// +// NOTE(mitchellh): Change notifications are not coalesced currently. Under +// conditions where managed proxy configurations are changing in a hot +// loop, it is possible for the manager to constantly attempt to sync. This +// is unlikely, but its also easy to introduce basic coalescing (even over +// millisecond intervals) to prevent total waste compute cycles. +type Manager struct { + // State is the local state that is the source of truth for all + // configured managed proxies. + State *local.State + + // Logger is the logger for information about manager behavior. + // Output for proxies will not go here generally but varies by proxy + // implementation type. + Logger *log.Logger + + // lock is held while reading/writing any internal state of the manager. + // cond is a condition variable on lock that is broadcasted for runState + // changes. + lock *sync.Mutex + cond *sync.Cond + + // runState is the current state of the manager. To read this the + // lock must be held. The condition variable cond can be waited on + // for changes to this value. + runState managerRunState + + proxies map[string]Proxy +} + +// defaultLogger is the defaultLogger for NewManager so there it is never nil +var defaultLogger = log.New(os.Stderr, "", log.LstdFlags) + +// NewManager initializes a Manager. After initialization, the exported +// fields should be configured as desired. To start the Manager, execute +// Run in a goroutine. +func NewManager() *Manager { + var lock sync.Mutex + return &Manager{ + Logger: defaultLogger, + lock: &lock, + cond: sync.NewCond(&lock), + proxies: make(map[string]Proxy), + } +} + +// managerRunState is the state of the Manager. +// +// This is a basic state machine with the following transitions: +// +// * idle => running, stopped +// * running => stopping, stopped +// * stopping => stopped +// * stopped => <> +// +type managerRunState uint8 + +const ( + managerStateIdle managerRunState = iota + managerStateRunning + managerStateStopping + managerStateStopped +) + +// Close stops the manager. Managed processes are NOT stopped. +func (m *Manager) Close() error { + m.lock.Lock() + defer m.lock.Unlock() + + for { + switch m.runState { + case managerStateIdle: + // Idle so just set it to stopped and return. We notify + // the condition variable in case others are waiting. + m.runState = managerStateStopped + m.cond.Broadcast() + return nil + + case managerStateRunning: + // Set the state to stopping and broadcast to all waiters, + // since Run is sitting on cond.Wait. + m.runState = managerStateStopping + m.cond.Broadcast() + m.cond.Wait() // Wait on the stopping event + + case managerStateStopping: + // Still stopping, wait... + m.cond.Wait() + + case managerStateStopped: + // Stopped, target state reached + return nil + } + } +} + +// Kill will Close the manager and Kill all proxies that were being managed. +// +// This is safe to call with Close already called since Close is idempotent. +func (m *Manager) Kill() error { + // Close first so that we aren't getting changes in proxies + if err := m.Close(); err != nil { + return err + } + + m.lock.Lock() + defer m.lock.Unlock() + + var err error + for id, proxy := range m.proxies { + if err := proxy.Stop(); err != nil { + err = multierror.Append( + err, fmt.Errorf("failed to stop proxy %q: %s", id, err)) + continue + } + + // Remove it since it is already stopped successfully + delete(m.proxies, id) + } + + return err +} + +// Run syncs with the local state and supervises existing proxies. +// +// This blocks and should be run in a goroutine. If another Run is already +// executing, this will do nothing and return. +func (m *Manager) Run() { + m.lock.Lock() + if m.runState != managerStateIdle { + m.lock.Unlock() + return + } + + // Set the state to running + m.runState = managerStateRunning + m.lock.Unlock() + + // Start a goroutine that just waits for a stop request + stopCh := make(chan struct{}) + go func() { + defer close(stopCh) + m.lock.Lock() + defer m.lock.Unlock() + + // We wait for anything not running, just so we're more resilient + // in the face of state machine issues. Basically any state change + // will cause us to quit. + for m.runState != managerStateRunning { + m.cond.Wait() + } + }() + + // When we exit, we set the state to stopped and broadcast to any + // waiting Close functions that they can return. + defer func() { + m.lock.Lock() + m.runState = managerStateStopped + m.cond.Broadcast() + m.lock.Unlock() + }() + + // Register for proxy catalog change notifications + notifyCh := make(chan struct{}, 1) + m.State.NotifyProxy(notifyCh) + defer m.State.StopNotifyProxy(notifyCh) + + for { + // Sync first, before waiting on further notifications so that + // we can start with a known-current state. + m.sync() + + select { + case <-notifyCh: + // Changes exit select so we can reloop and reconfigure proxies + + case <-stopCh: + // Stop immediately, no cleanup + return + } + } +} + +// sync syncs data with the local state store to update the current manager +// state and start/stop necessary proxies. +func (m *Manager) sync() { + m.lock.Lock() + defer m.lock.Unlock() + + // Get the current set of proxies + state := m.State.Proxies() + + // Go through our existing proxies that we're currently managing to + // determine if they're still in the state or not. If they're in the + // state, we need to diff to determine if we're starting a new proxy + // If they're not in the state, then we need to stop the proxy since it + // is now orphaned. + for id, proxy := range m.proxies { + // Get the proxy. + stateProxy, ok := state[id] + if !ok { + // Proxy is deregistered. Remove it from our map and stop it + delete(m.proxies, id) + if err := proxy.Stop(); err != nil { + m.Logger.Printf("[ERROR] agent/proxy: failed to stop deregistered proxy for %q: %s", id, err) + } + + continue + } + + // Proxy is in the state. Always delete it so that the remainder + // are NEW proxies that we start after this loop. + delete(state, id) + + // TODO: diff and restart if necessary + println(stateProxy) + } + + // Remaining entries in state are new proxies. Start them! + for id, stateProxy := range state { + proxy, err := m.newProxy(stateProxy) + if err != nil { + m.Logger.Printf("[ERROR] agent/proxy: failed to initialize proxy for %q: %s", id, err) + continue + } + + if err := proxy.Start(); err != nil { + m.Logger.Printf("[ERROR] agent/proxy: failed to start proxy for %q: %s", id, err) + continue + } + + m.proxies[id] = proxy + } +} + +// newProxy creates the proper Proxy implementation for the configured +// local managed proxy. +func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) { + // Defensive because the alternative is to panic which is not desired + if mp == nil || mp.Proxy == nil { + return nil, fmt.Errorf("internal error: nil *local.ManagedProxy or Proxy field") + } + + p := mp.Proxy + switch p.ExecMode { + case structs.ProxyExecModeDaemon: + command := p.Command + if len(command) == 0 { + command = p.CommandDefault + } + + // This should never happen since validation should happen upstream + // but verify it because the alternative is to panic below. + if len(command) == 0 { + return nil, fmt.Errorf("daemon mode managed proxy requires command") + } + + // Build the command to execute. + var cmd exec.Cmd + cmd.Path = command[0] + cmd.Args = command[1:] + + // Build the daemon structure + return &Daemon{ + Command: &cmd, + ProxyToken: mp.ProxyToken, + Logger: m.Logger, + }, nil + + default: + return nil, fmt.Errorf("unsupported managed proxy type: %q", p.ExecMode) + } +} diff --git a/agent/proxy/manager_test.go b/agent/proxy/manager_test.go new file mode 100644 index 0000000000..13a57d7b74 --- /dev/null +++ b/agent/proxy/manager_test.go @@ -0,0 +1,79 @@ +package proxy + +import ( + "os" + "os/exec" + "path/filepath" + "testing" + + "github.com/hashicorp/consul/agent/local" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/testutil/retry" + "github.com/stretchr/testify/require" +) + +func TestManagerClose_noRun(t *testing.T) { + t.Parallel() + + // Really we're testing that it doesn't deadlock here. + m := NewManager() + require.NoError(t, m.Close()) + + // Close again for sanity + require.NoError(t, m.Close()) +} + +// Test that Run performs an initial sync (if local.State is already set) +// rather than waiting for a notification from the local state. +func TestManagerRun_initialSync(t *testing.T) { + t.Parallel() + + state := testState(t) + m := NewManager() + m.State = state + defer m.Kill() + + // Add the proxy before we start the manager to verify initial sync + td, closer := testTempDir(t) + defer closer() + path := filepath.Join(td, "file") + testStateProxy(t, state, helperProcess("restart", path)) + + // Start the manager + go m.Run() + + // We should see the path appear shortly + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path) + if err == nil { + return + } + r.Fatalf("error waiting for path: %s", err) + }) +} + +func testState(t *testing.T) *local.State { + state := local.TestState(t) + require.NoError(t, state.AddService(&structs.NodeService{ + Service: "web", + }, "web")) + + return state +} + +// testStateProxy registers a proxy with the given local state and the command +// (expected to be from the helperProcess function call). It returns the +// ID for deregistration. +func testStateProxy(t *testing.T, state *local.State, cmd *exec.Cmd) string { + command := []string{cmd.Path} + command = append(command, cmd.Args...) + + p, err := state.AddProxy(&structs.ConnectManagedProxy{ + ExecMode: structs.ProxyExecModeDaemon, + Command: command, + TargetServiceID: "web", + }, "web") + require.NoError(t, err) + + return p.Proxy.ProxyService.ID +} diff --git a/agent/proxy/proxy_test.go b/agent/proxy/proxy_test.go index fa8eef1283..1d6df99ef5 100644 --- a/agent/proxy/proxy_test.go +++ b/agent/proxy/proxy_test.go @@ -31,16 +31,19 @@ func testTempDir(t *testing.T) (string, func()) { } } +// helperProcessSentinel is a sentinel value that is put as the first +// argument following "--" and is used to determine if TestHelperProcess +// should run. +const helperProcessSentinel = "WANT_HELPER_PROCESS" + // helperProcess returns an *exec.Cmd that can be used to execute the // TestHelperProcess function below. This can be used to test multi-process // interactions. func helperProcess(s ...string) *exec.Cmd { - cs := []string{"-test.run=TestHelperProcess", "--"} + cs := []string{"-test.run=TestHelperProcess", "--", helperProcessSentinel} cs = append(cs, s...) - env := []string{"GO_WANT_HELPER_PROCESS=1"} cmd := exec.Command(os.Args[0], cs...) - cmd.Env = append(env, os.Environ()...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr return cmd @@ -49,12 +52,6 @@ func helperProcess(s ...string) *exec.Cmd { // This is not a real test. This is just a helper process kicked off by tests // using the helperProcess helper function. func TestHelperProcess(t *testing.T) { - if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" { - return - } - - defer os.Exit(0) - args := os.Args for len(args) > 0 { if args[0] == "--" { @@ -65,15 +62,16 @@ func TestHelperProcess(t *testing.T) { args = args[1:] } - if len(args) == 0 { - fmt.Fprintf(os.Stderr, "No command\n") - os.Exit(2) + if len(args) == 0 || args[0] != helperProcessSentinel { + return } + defer os.Exit(0) + args = args[1:] // strip sentinel value cmd, args := args[0], args[1:] switch cmd { // While running, this creates a file in the given directory (args[0]) - // and deletes it only whe nit is stopped. + // and deletes it only when it is stopped. case "start-stop": ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt) diff --git a/agent/proxy/test.go b/agent/proxy/test.go new file mode 100644 index 0000000000..b6b35bb04b --- /dev/null +++ b/agent/proxy/test.go @@ -0,0 +1,13 @@ +package proxy + +// defaultTestProxy is the test proxy that is instantiated for proxies with +// an execution mode of ProxyExecModeTest. +var defaultTestProxy = testProxy{} + +// testProxy is a Proxy implementation that stores state in-memory and +// is only used for unit testing. It is in a non _test.go file because the +// factory for initializing it is exported (newProxy). +type testProxy struct { + Start uint32 + Stop uint32 +} diff --git a/agent/structs/connect.go b/agent/structs/connect.go index b40091adff..02b5ba1fa4 100644 --- a/agent/structs/connect.go +++ b/agent/structs/connect.go @@ -33,6 +33,11 @@ const ( // ProxyExecModeScript executes a proxy config script on each change to it's // config. ProxyExecModeScript + + // ProxyExecModeTest tracks the start/stop of the proxy in-memory + // and is only used for tests. This shouldn't be set outside of tests, + // but even if it is it has no external effect. + ProxyExecModeTest ) // String implements Stringer