Fixs a few issues that stopped this working in real life but not caught by tests:

- Dev mode assumed no persistence of services although proxy state is persisted which caused proxies to be killed on startup as their services were no longer registered. Fixed.
 - Didn't snapshot the ProxyID which meant that proxies were adopted OK from snapshot but failed to restart if they died since there was no proxyID in the ENV on restart
 - Dev mode with no persistence just kills all proxies on shutdown since it can't recover them later
 - Naming things
pull/4275/head
Paul Banks 2018-06-06 21:04:19 +01:00 committed by Jack Pearkes
parent 77a8003475
commit d1c67d90bc
7 changed files with 66 additions and 22 deletions

View File

@ -1104,7 +1104,7 @@ func (a *Agent) setupNodeID(config *config.RuntimeConfig) error {
} }
// For dev mode we have no filesystem access so just make one. // For dev mode we have no filesystem access so just make one.
if a.config.DevMode { if a.config.DataDir == "" {
id, err := a.makeNodeID() id, err := a.makeNodeID()
if err != nil { if err != nil {
return err return err
@ -1320,8 +1320,17 @@ func (a *Agent) ShutdownAgent() error {
// Stop the proxy manager // Stop the proxy manager
if a.proxyManager != nil { if a.proxyManager != nil {
if err := a.proxyManager.Close(); err != nil { // If persistence is disabled (implies DevMode but a subset of DevMode) then
a.logger.Printf("[WARN] agent: error shutting down proxy manager: %s", err) // don't leave the proxies running since the agent will not be able to
// recover them later.
if a.config.DataDir == "" {
if err := a.proxyManager.Kill(); err != nil {
a.logger.Printf("[WARN] agent: error shutting down proxy manager: %s", err)
}
} else {
if err := a.proxyManager.Close(); err != nil {
a.logger.Printf("[WARN] agent: error shutting down proxy manager: %s", err)
}
} }
} }
@ -1720,7 +1729,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che
a.State.AddService(service, token) a.State.AddService(service, token)
// Persist the service to a file // Persist the service to a file
if persist && !a.config.DevMode { if persist && a.config.DataDir != "" {
if err := a.persistService(service); err != nil { if err := a.persistService(service); err != nil {
return err return err
} }
@ -2019,7 +2028,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
} }
// Persist the check // Persist the check
if persist && !a.config.DevMode { if persist && a.config.DataDir != "" {
return a.persistCheck(check, chkType) return a.persistCheck(check, chkType)
} }
@ -2118,7 +2127,7 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool,
} }
// Persist the proxy // Persist the proxy
if persist && !a.config.DevMode { if persist && a.config.DataDir != "" {
return a.persistProxy(proxyState) return a.persistProxy(proxyState)
} }
return nil return nil
@ -2177,7 +2186,7 @@ func (a *Agent) RemoveProxy(proxyID string, persist bool) error {
return err return err
} }
if persist && !a.config.DevMode { if persist && a.config.DataDir != "" {
return a.purgeProxy(proxyID) return a.purgeProxy(proxyID)
} }
@ -2298,7 +2307,7 @@ func (a *Agent) updateTTLCheck(checkID types.CheckID, status, output string) err
check.SetStatus(status, output) check.SetStatus(status, output)
// We don't write any files in dev mode so bail here. // We don't write any files in dev mode so bail here.
if a.config.DevMode { if a.config.DataDir == "" {
return nil return nil
} }

View File

@ -34,9 +34,9 @@ type Daemon struct {
// be a Cmd that isn't yet started. // be a Cmd that isn't yet started.
Command *exec.Cmd Command *exec.Cmd
// ProxyId is the ID of the proxy service. This is required for API // ProxyID is the ID of the proxy service. This is required for API
// requests (along with the token) and is passed via env var. // requests (along with the token) and is passed via env var.
ProxyId string ProxyID string
// ProxyToken is the special local-only ACL token that allows a proxy // ProxyToken is the special local-only ACL token that allows a proxy
// to communicate to the Connect-specific endpoints. // to communicate to the Connect-specific endpoints.
@ -121,6 +121,11 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) {
if !attemptsDeadline.IsZero() && time.Now().After(attemptsDeadline) { if !attemptsDeadline.IsZero() && time.Now().After(attemptsDeadline) {
attempts = 0 attempts = 0
} }
// Set ourselves a deadline - we have to make it at least this long before
// we come around the loop to consider it to have been a "successful"
// daemon startup and rest the counter above. Note that if the daemon
// fails before this, we reset the deadline to zero below so that backoff
// sleeps in the loop don't count as "success" time.
attemptsDeadline = time.Now().Add(DaemonRestartHealthy) attemptsDeadline = time.Now().Add(DaemonRestartHealthy)
attempts++ attempts++
@ -136,6 +141,10 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) {
} }
if waitTime > 0 { if waitTime > 0 {
// If we are waiting, reset the success deadline so we don't
// accidentally interpret backoff sleep as successful runtime.
attemptsDeadline = time.Time{}
p.Logger.Printf( p.Logger.Printf(
"[WARN] agent/proxy: waiting %s before restarting daemon", "[WARN] agent/proxy: waiting %s before restarting daemon",
waitTime) waitTime)
@ -224,15 +233,21 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) {
func (p *Daemon) start() (*os.Process, error) { func (p *Daemon) start() (*os.Process, error) {
cmd := *p.Command cmd := *p.Command
// Add the proxy token to the environment. We first copy the env because // Add the proxy token to the environment. We first copy the env because it is
// it is a slice and therefore the "copy" above will only copy the slice // a slice and therefore the "copy" above will only copy the slice reference.
// reference. We allocate an exactly sized slice. // We allocate an exactly sized slice.
cmd.Env = make([]string, len(p.Command.Env), len(p.Command.Env)+1) //
// Note that anything we add to the Env here is NOT persisted in the snapshot
// which only looks at p.Command.Env so it needs to be reconstructible exactly
// from data in the snapshot otherwise.
cmd.Env = make([]string, len(p.Command.Env), len(p.Command.Env)+2)
copy(cmd.Env, p.Command.Env) copy(cmd.Env, p.Command.Env)
cmd.Env = append(cmd.Env, cmd.Env = append(cmd.Env,
fmt.Sprintf("%s=%s", EnvProxyId, p.ProxyId), fmt.Sprintf("%s=%s", EnvProxyID, p.ProxyID),
fmt.Sprintf("%s=%s", EnvProxyToken, p.ProxyToken)) fmt.Sprintf("%s=%s", EnvProxyToken, p.ProxyToken))
// Update the Daemon env
// Args must always contain a 0 entry which is usually the executed binary. // Args must always contain a 0 entry which is usually the executed binary.
// To be safe and a bit more robust we default this, but only to prevent // To be safe and a bit more robust we default this, but only to prevent
// a panic below. // a panic below.
@ -366,6 +381,7 @@ func (p *Daemon) Equal(raw Proxy) bool {
// We compare equality on a subset of the command configuration // We compare equality on a subset of the command configuration
return p.ProxyToken == p2.ProxyToken && return p.ProxyToken == p2.ProxyToken &&
p.ProxyID == p2.ProxyID &&
p.Command.Path == p2.Command.Path && p.Command.Path == p2.Command.Path &&
p.Command.Dir == p2.Command.Dir && p.Command.Dir == p2.Command.Dir &&
reflect.DeepEqual(p.Command.Args, p2.Command.Args) && reflect.DeepEqual(p.Command.Args, p2.Command.Args) &&
@ -389,6 +405,7 @@ func (p *Daemon) MarshalSnapshot() map[string]interface{} {
"CommandDir": p.Command.Dir, "CommandDir": p.Command.Dir,
"CommandEnv": p.Command.Env, "CommandEnv": p.Command.Env,
"ProxyToken": p.ProxyToken, "ProxyToken": p.ProxyToken,
"ProxyID": p.ProxyID,
} }
} }
@ -404,6 +421,7 @@ func (p *Daemon) UnmarshalSnapshot(m map[string]interface{}) error {
// Set the basic fields // Set the basic fields
p.ProxyToken = s.ProxyToken p.ProxyToken = s.ProxyToken
p.ProxyID = s.ProxyID
p.Command = &exec.Cmd{ p.Command = &exec.Cmd{
Path: s.CommandPath, Path: s.CommandPath,
Args: s.CommandArgs, Args: s.CommandArgs,
@ -435,7 +453,7 @@ func (p *Daemon) UnmarshalSnapshot(m map[string]interface{}) error {
// within the manager snapshot and is restored automatically. // within the manager snapshot and is restored automatically.
type daemonSnapshot struct { type daemonSnapshot struct {
// Pid of the process. This is the only value actually required to // Pid of the process. This is the only value actually required to
// regain mangement control. The remainder values are for Equal. // regain management control. The remainder values are for Equal.
Pid int Pid int
// Command information // Command information
@ -448,4 +466,6 @@ type daemonSnapshot struct {
// store the hash of the token but for now we need the full token in // store the hash of the token but for now we need the full token in
// case the process dies and has to be restarted. // case the process dies and has to be restarted.
ProxyToken string ProxyToken string
ProxyID string
} }

View File

@ -32,7 +32,7 @@ func TestDaemonStartStop(t *testing.T) {
d := &Daemon{ d := &Daemon{
Command: helperProcess("start-stop", path), Command: helperProcess("start-stop", path),
ProxyId: "tubes", ProxyID: "tubes",
ProxyToken: uuid, ProxyToken: uuid,
Logger: testLogger, Logger: testLogger,
} }
@ -495,6 +495,19 @@ func TestDaemonEqual(t *testing.T) {
true, true,
}, },
{
"Different proxy ID",
&Daemon{
Command: &exec.Cmd{Path: "/foo"},
ProxyID: "web",
},
&Daemon{
Command: &exec.Cmd{Path: "/foo"},
ProxyID: "db",
},
false,
},
{ {
"Different path", "Different path",
&Daemon{ &Daemon{
@ -568,6 +581,7 @@ func TestDaemonMarshalSnapshot(t *testing.T) {
"basic", "basic",
&Daemon{ &Daemon{
Command: &exec.Cmd{Path: "/foo"}, Command: &exec.Cmd{Path: "/foo"},
ProxyID: "web",
process: &os.Process{Pid: 42}, process: &os.Process{Pid: 42},
}, },
map[string]interface{}{ map[string]interface{}{
@ -577,6 +591,7 @@ func TestDaemonMarshalSnapshot(t *testing.T) {
"CommandDir": "", "CommandDir": "",
"CommandEnv": []string(nil), "CommandEnv": []string(nil),
"ProxyToken": "", "ProxyToken": "",
"ProxyID": "web",
}, },
}, },
} }

View File

@ -423,7 +423,7 @@ func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) {
// Build the daemon structure // Build the daemon structure
proxy.Command = &cmd proxy.Command = &cmd
proxy.ProxyId = id proxy.ProxyID = id
proxy.ProxyToken = mp.ProxyToken proxy.ProxyToken = mp.ProxyToken
return proxy, nil return proxy, nil

View File

@ -12,10 +12,10 @@ import (
) )
const ( const (
// EnvProxyId is the name of the environment variable that is set for // EnvProxyID is the name of the environment variable that is set for
// managed proxies containing the proxy service ID. This is required along // managed proxies containing the proxy service ID. This is required along
// with the token to make API requests related to the proxy. // with the token to make API requests related to the proxy.
EnvProxyId = "CONNECT_PROXY_ID" EnvProxyID = "CONNECT_PROXY_ID"
// EnvProxyToken is the name of the environment variable that is passed // EnvProxyToken is the name of the environment variable that is passed
// to managed proxies containing the proxy token. // to managed proxies containing the proxy token.

View File

@ -81,7 +81,7 @@ func TestHelperProcess(t *testing.T) {
path := args[0] path := args[0]
var data []byte var data []byte
data = append(data, []byte(os.Getenv(EnvProxyId))...) data = append(data, []byte(os.Getenv(EnvProxyID))...)
data = append(data, ':') data = append(data, ':')
data = append(data, []byte(os.Getenv(EnvProxyToken))...) data = append(data, []byte(os.Getenv(EnvProxyToken))...)

View File

@ -125,7 +125,7 @@ func (c *cmd) Run(args []string) int {
// Load the proxy ID and token from env vars if they're set // Load the proxy ID and token from env vars if they're set
if c.proxyID == "" { if c.proxyID == "" {
c.proxyID = os.Getenv(proxyAgent.EnvProxyId) c.proxyID = os.Getenv(proxyAgent.EnvProxyID)
} }
if c.http.Token() == "" { if c.http.Token() == "" {
c.http.SetToken(os.Getenv(proxyAgent.EnvProxyToken)) c.http.SetToken(os.Getenv(proxyAgent.EnvProxyToken))