diff --git a/agent/proxy/manager.go b/agent/proxy/manager.go index c9c63311f1..7a8cd5d4cd 100644 --- a/agent/proxy/manager.go +++ b/agent/proxy/manager.go @@ -25,6 +25,11 @@ const ( // changes. Then the whole cycle resets. ManagerCoalescePeriod = 5 * time.Second ManagerQuiescentPeriod = 500 * time.Millisecond + + // ManagerSnapshotPeriod is the interval that snapshots are taken. + // The last snapshot state is preserved and if it matches a file isn't + // written, so its safe for this to be reasonably frequent. + ManagerSnapshotPeriod = 1 * time.Second ) // Manager starts, stops, snapshots, and restores managed proxies. @@ -64,9 +69,14 @@ type Manager struct { // DataDir string - // SnapshotDir is the path to the directory where snapshots will - // be written - SnapshotDir string + // SnapshotPeriod is the duration between snapshots. This can be set + // relatively low to ensure accuracy, because if the new snapshot matches + // the last snapshot taken, no file will be written. Therefore, setting + // this low causes only slight CPU/memory usage but doesn't result in + // disk IO. If this isn't set, ManagerSnapshotPeriod will be the default. + // + // This only has an effect if snapshots are enabled (DataDir is set). + SnapshotPeriod time.Duration // CoalescePeriod and QuiescencePeriod control the timers for coalescing // updates from the local state. See the defaults at the top of this @@ -86,7 +96,8 @@ type Manager struct { // for changes to this value. runState managerRunState - proxies map[string]Proxy + proxies map[string]Proxy + lastSnapshot *snapshot } // NewManager initializes a Manager. After initialization, the exported @@ -96,6 +107,7 @@ func NewManager() *Manager { var lock sync.Mutex return &Manager{ Logger: defaultLogger, + SnapshotPeriod: ManagerSnapshotPeriod, CoalescePeriod: ManagerCoalescePeriod, QuiescentPeriod: ManagerQuiescentPeriod, lock: &lock, @@ -228,6 +240,12 @@ func (m *Manager) Run() { m.State.NotifyProxy(notifyCh) defer m.State.StopNotifyProxy(notifyCh) + // Start the timer for snapshots. We don't use a ticker because disk + // IO can be slow and we don't want overlapping notifications. So we only + // reset the timer once the snapshot is complete rather than continously. + snapshotTimer := time.NewTimer(m.SnapshotPeriod) + defer snapshotTimer.Stop() + m.Logger.Println("[DEBUG] agent/proxy: managed Connect proxy manager started") SYNC: for { @@ -261,6 +279,17 @@ SYNC: case <-quiescent: continue SYNC + case <-snapshotTimer.C: + // Perform a snapshot + if path := m.SnapshotPath(); path != "" { + if err := m.snapshot(path, true); err != nil { + m.Logger.Printf("[WARN] agent/proxy: failed to snapshot state: %s", err) + } + } + + // Reset + snapshotTimer.Reset(m.SnapshotPeriod) + case <-stopCh: // Stop immediately, no cleanup m.Logger.Println("[DEBUG] agent/proxy: Stopping managed Connect proxy manager") @@ -342,10 +371,22 @@ func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) { if mp == nil || mp.Proxy == nil { return nil, fmt.Errorf("internal error: nil *local.ManagedProxy or Proxy field") } - p := mp.Proxy - switch p.ExecMode { - case structs.ProxyExecModeDaemon: + + // We reuse the service ID a few times + id := p.ProxyService.ID + + // Create the Proxy. We could just as easily switch on p.ExecMode + // but I wanted there to be only location where ExecMode => Proxy so + // it lowers the chance that is wrong. + proxy, err := m.newProxyFromMode(p.ExecMode, id) + if err != nil { + return nil, err + } + + // Depending on the proxy type we configure the rest from our ManagedProxy + switch proxy := proxy.(type) { + case *Daemon: command := p.Command // This should never happen since validation should happen upstream @@ -354,9 +395,6 @@ func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) { return nil, fmt.Errorf("daemon mode managed proxy requires command") } - // We reuse the service ID a few times - id := p.ProxyService.ID - // Build the command to execute. var cmd exec.Cmd cmd.Path = command[0] @@ -366,18 +404,31 @@ func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) { } // Build the daemon structure - return &Daemon{ - Command: &cmd, - ProxyToken: mp.ProxyToken, - Logger: m.Logger, - PidPath: pidPath(filepath.Join(m.DataDir, "pids"), id), - }, nil + proxy.Command = &cmd + proxy.ProxyToken = mp.ProxyToken + return proxy, nil default: return nil, fmt.Errorf("unsupported managed proxy type: %q", p.ExecMode) } } +// newProxyFromMode just initializes the proxy structure from only the mode +// and the service ID. This is a shared method between newProxy and Restore +// so that we only have one location where we turn ExecMode into a Proxy. +func (m *Manager) newProxyFromMode(mode structs.ProxyExecMode, id string) (Proxy, error) { + switch mode { + case structs.ProxyExecModeDaemon: + return &Daemon{ + Logger: m.Logger, + PidPath: pidPath(filepath.Join(m.DataDir, "pids"), id), + }, nil + + default: + return nil, fmt.Errorf("unsupported managed proxy type: %q", mode) + } +} + // configureLogDir sets up the file descriptors to stdout/stderr so that // they log to the proper file path for the given service ID. func (m *Manager) configureLogDir(id string, cmd *exec.Cmd) error { diff --git a/agent/proxy/manager_test.go b/agent/proxy/manager_test.go index d9a817af34..28922cbfa8 100644 --- a/agent/proxy/manager_test.go +++ b/agent/proxy/manager_test.go @@ -261,9 +261,98 @@ func TestManagerRun_daemonPid(t *testing.T) { require.NotEmpty(pidRaw) } +// Test the Snapshot/Restore works. +func TestManagerRun_snapshotRestore(t *testing.T) { + t.Parallel() + + require := require.New(t) + state := local.TestState(t) + m, closer := testManager(t) + defer closer() + m.State = state + defer m.Kill() + + // Add the proxy + td, closer := testTempDir(t) + defer closer() + path := filepath.Join(td, "file") + testStateProxy(t, state, "web", helperProcess("start-stop", path)) + + // Set a low snapshot period so we get a snapshot + m.SnapshotPeriod = 10 * time.Millisecond + + // Start the manager + go m.Run() + + // We should see the path appear shortly + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path) + if err == nil { + return + } + r.Fatalf("error waiting for path: %s", err) + }) + + // Wait for the snapshot + snapPath := m.SnapshotPath() + retry.Run(t, func(r *retry.R) { + raw, err := ioutil.ReadFile(snapPath) + if err != nil { + r.Fatalf("error waiting for path: %s", err) + } + if len(raw) < 30 { + r.Fatalf("snapshot too small") + } + }) + + // Stop the sync + require.NoError(m.Close()) + + // File should still exist + _, err := os.Stat(path) + require.NoError(err) + + // Restore a manager from a snapshot + m2, closer := testManager(t) + m2.State = state + defer closer() + defer m2.Kill() + require.NoError(m2.Restore(snapPath)) + + // Start + go m2.Run() + + // Add a second proxy so that we can determine when we're up + // and running. + path2 := filepath.Join(td, "file") + testStateProxy(t, state, "db", helperProcess("start-stop", path2)) + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path2) + if err == nil { + return + } + r.Fatalf("error waiting for path: %s", err) + }) + + // Kill m2, which should kill our main process + require.NoError(m2.Kill()) + + // File should no longer exist + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path) + if err != nil { + return + } + r.Fatalf("file still exists") + }) +} + func testManager(t *testing.T) (*Manager, func()) { m := NewManager() + // Setup a default state + m.State = local.TestState(t) + // Set these periods low to speed up tests m.CoalescePeriod = 1 * time.Millisecond m.QuiescentPeriod = 1 * time.Millisecond diff --git a/agent/proxy/proxy.go b/agent/proxy/proxy.go index e1bad92c0e..1bb88da8ea 100644 --- a/agent/proxy/proxy.go +++ b/agent/proxy/proxy.go @@ -7,6 +7,10 @@ // for that is available in the "connect/proxy" package. package proxy +import ( + "github.com/hashicorp/consul/agent/structs" +) + // EnvProxyToken is the name of the environment variable that is passed // to managed proxies containing the proxy token. const EnvProxyToken = "CONNECT_PROXY_TOKEN" @@ -16,6 +20,9 @@ const EnvProxyToken = "CONNECT_PROXY_TOKEN" // Calls to all the functions on this interface must be concurrency safe. // Please read the documentation carefully on top of each function for expected // behavior. +// +// Whenever a new proxy type is implemented, please also update proxyExecMode +// and newProxyFromMode and newProxy to support the new proxy. type Proxy interface { // Start starts the proxy. If an error is returned then the managed // proxy registration is rejected. Therefore, this should only fail if @@ -56,3 +63,17 @@ type Proxy interface { MarshalSnapshot() map[string]interface{} UnmarshalSnapshot(map[string]interface{}) error } + +// proxyExecMode returns the ProxyExecMode for a Proxy instance. +func proxyExecMode(p Proxy) structs.ProxyExecMode { + switch p.(type) { + case *Daemon: + return structs.ProxyExecModeDaemon + + case *Noop: + return structs.ProxyExecModeTest + + default: + return structs.ProxyExecModeUnspecified + } +} diff --git a/agent/proxy/snapshot.go b/agent/proxy/snapshot.go index b119bfddf6..973a2f083d 100644 --- a/agent/proxy/snapshot.go +++ b/agent/proxy/snapshot.go @@ -1,7 +1,15 @@ package proxy import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "reflect" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib/file" ) // snapshot is the structure of the snapshot file. This is unexported because @@ -17,7 +25,7 @@ type snapshot struct { Version int // Proxies are the set of proxies that the manager has. - Proxies []snapshotProxy + Proxies map[string]snapshotProxy } // snapshotProxy represents a single proxy. @@ -29,3 +37,127 @@ type snapshotProxy struct { // implementation uses to restore state. Config map[string]interface{} } + +// snapshotVersion is the current version to encode within the snapshot. +const snapshotVersion = 1 + +// SnapshotPath returns the default snapshot path for this manager. This +// will return empty if DataDir is not set. This file may not exist yet. +func (m *Manager) SnapshotPath() string { + if m.DataDir == "" { + return "" + } + + return filepath.Join(m.DataDir, "snapshot.json") +} + +// Snapshot will persist a snapshot of the proxy manager state that +// can be restored with Restore. +// +// If DataDir is non-empty, then the Manager will automatically snapshot +// whenever the set of managed proxies changes. This method generally doesn't +// need to be called manually. +func (m *Manager) Snapshot(path string) error { + m.lock.Lock() + defer m.lock.Unlock() + return m.snapshot(path, false) +} + +// snapshot is the internal function analogous to Snapshot but expects +// a lock to already be held. +// +// checkDup when set will store the snapshot on lastSnapshot and use +// reflect.DeepEqual to verify that its not writing an identical snapshot. +func (m *Manager) snapshot(path string, checkDup bool) error { + // Build the snapshot + s := snapshot{ + Version: snapshotVersion, + Proxies: make(map[string]snapshotProxy, len(m.proxies)), + } + for id, p := range m.proxies { + // Get the snapshot configuration. If the configuration is nil or + // empty then we don't persist this proxy. + config := p.MarshalSnapshot() + if len(config) == 0 { + continue + } + + s.Proxies[id] = snapshotProxy{ + Mode: proxyExecMode(p), + Config: config, + } + } + + // Dup detection, if the snapshot is identical to the last, do nothing + if checkDup && reflect.DeepEqual(m.lastSnapshot, &s) { + return nil + } + + // Encode as JSON + encoded, err := json.Marshal(&s) + if err != nil { + return err + } + + // Write the file + err = file.WriteAtomic(path, encoded) + if err == nil && checkDup { + m.lastSnapshot = &s + } + return err +} + +// Restore restores the manager state from a snapshot at path. If path +// doesn't exist, this does nothing and no error is returned. +// +// This restores proxy state but does not restore any Manager configuration +// such as DataDir, Logger, etc. All of those should be set _before_ Restore +// is called. +// +// Restore must be called before Run. Restore will immediately start +// supervising the restored processes but will not sync with the local +// state store until Run is called. +// +// If an error is returned the manager state is left untouched. +func (m *Manager) Restore(path string) error { + buf, err := ioutil.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return nil + } + + return err + } + + var s snapshot + if err := json.Unmarshal(buf, &s); err != nil { + return err + } + + // Verify the version matches so we can be more confident that we're + // decoding a structure that we expect. + if s.Version != snapshotVersion { + return fmt.Errorf("unknown snapshot version, expecting %d", snapshotVersion) + } + + // Build the proxies from the snapshot + proxies := make(map[string]Proxy, len(s.Proxies)) + for id, sp := range s.Proxies { + p, err := m.newProxyFromMode(sp.Mode, id) + if err != nil { + return err + } + + if err := p.UnmarshalSnapshot(sp.Config); err != nil { + return err + } + + proxies[id] = p + } + + // Overwrite the proxies. The documentation notes that this will happen. + m.lock.Lock() + defer m.lock.Unlock() + m.proxies = proxies + return nil +}