mirror of https://github.com/hashicorp/consul
Persist proxies from config files
Also change how loadProxies works. Now it will load all persisted proxies into a map, then when loading config file proxies will look up the previous proxy token in that map.pull/4407/head
parent
b3160ba22d
commit
3fe5f566f2
193
agent/agent.go
193
agent/agent.go
|
@ -208,6 +208,9 @@ type Agent struct {
|
|||
|
||||
// proxyManager is the proxy process manager for managed Connect proxies.
|
||||
proxyManager *proxy.Manager
|
||||
|
||||
// proxyLock protects proxy information in the local state from concurrent modification
|
||||
proxyLock sync.Mutex
|
||||
}
|
||||
|
||||
func New(c *config.RuntimeConfig) (*Agent, error) {
|
||||
|
@ -1616,16 +1619,21 @@ func (a *Agent) purgeService(serviceID string) error {
|
|||
type persistedProxy struct {
|
||||
ProxyToken string
|
||||
Proxy *structs.ConnectManagedProxy
|
||||
|
||||
// Set to true when the proxy information originated from the agents configuration
|
||||
// as opposed to API registration.
|
||||
FromFile bool
|
||||
}
|
||||
|
||||
// persistProxy saves a proxy definition to a JSON file in the data dir
|
||||
func (a *Agent) persistProxy(proxy *local.ManagedProxy) error {
|
||||
func (a *Agent) persistProxy(proxy *local.ManagedProxy, FromFile bool) error {
|
||||
proxyPath := filepath.Join(a.config.DataDir, proxyDir,
|
||||
stringHash(proxy.Proxy.ProxyService.ID))
|
||||
|
||||
wrapped := persistedProxy{
|
||||
ProxyToken: proxy.ProxyToken,
|
||||
Proxy: proxy.Proxy,
|
||||
FromFile: FromFile,
|
||||
}
|
||||
encoded, err := json.Marshal(wrapped)
|
||||
if err != nil {
|
||||
|
@ -2076,7 +2084,9 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// AddProxy adds a new local Connect Proxy instance to be managed by the agent.
|
||||
// addProxyLocked adds a new local Connect Proxy instance to be managed by the agent.
|
||||
//
|
||||
// This assumes that the agent's proxyLock is already held
|
||||
//
|
||||
// It REQUIRES that the service that is being proxied is already present in the
|
||||
// local state. Note that this is only used for agent-managed proxies so we can
|
||||
|
@ -2091,7 +2101,7 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
|
|||
// definitions from disk; new proxies must leave it blank to get a new token
|
||||
// assigned. We need to restore from disk to enable to continue authenticating
|
||||
// running proxies that already had that credential injected.
|
||||
func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool,
|
||||
func (a *Agent) addProxyLocked(proxy *structs.ConnectManagedProxy, persist, FromFile bool,
|
||||
restoredProxyToken string) error {
|
||||
// Lookup the target service token in state if there is one.
|
||||
token := a.State.ServiceToken(proxy.TargetServiceID)
|
||||
|
@ -2143,11 +2153,33 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool,
|
|||
|
||||
// Persist the proxy
|
||||
if persist && a.config.DataDir != "" {
|
||||
return a.persistProxy(proxyState)
|
||||
return a.persistProxy(proxyState, FromFile)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// addProxyLocked adds a new local Connect Proxy instance to be managed by the agent.
|
||||
//
|
||||
// It REQUIRES that the service that is being proxied is already present in the
|
||||
// local state. Note that this is only used for agent-managed proxies so we can
|
||||
// ensure that we always make this true. For externally managed and registered
|
||||
// proxies we explicitly allow the proxy to be registered first to make
|
||||
// bootstrap ordering of a new service simpler but the same is not true here
|
||||
// since this is only ever called when setting up a _managed_ proxy which was
|
||||
// registered as part of a service registration either from config or HTTP API
|
||||
// call.
|
||||
//
|
||||
// The restoredProxyToken argument should only be used when restoring proxy
|
||||
// definitions from disk; new proxies must leave it blank to get a new token
|
||||
// assigned. We need to restore from disk to enable to continue authenticating
|
||||
// running proxies that already had that credential injected.
|
||||
func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist, FromFile bool,
|
||||
restoredProxyToken string) error {
|
||||
a.proxyLock.Lock()
|
||||
defer a.proxyLock.Unlock()
|
||||
return a.addProxyLocked(proxy, persist, FromFile, restoredProxyToken)
|
||||
}
|
||||
|
||||
// resolveProxyCheckAddress returns the best address to use for a TCP check of
|
||||
// the proxy's public listener. It expects the input to already have default
|
||||
// values populated by applyProxyConfigDefaults. It may return an empty string
|
||||
|
@ -2290,8 +2322,10 @@ func (a *Agent) applyProxyDefaults(proxy *structs.ConnectManagedProxy) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// RemoveProxy stops and removes a local proxy instance.
|
||||
func (a *Agent) RemoveProxy(proxyID string, persist bool) error {
|
||||
// removeProxyLocked stops and removes a local proxy instance.
|
||||
//
|
||||
// It is assumed that this function is called while holding the proxyLock already
|
||||
func (a *Agent) removeProxyLocked(proxyID string, persist bool) error {
|
||||
// Validate proxyID
|
||||
if proxyID == "" {
|
||||
return fmt.Errorf("proxyID missing")
|
||||
|
@ -2316,6 +2350,13 @@ func (a *Agent) RemoveProxy(proxyID string, persist bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// RemoveProxy stops and removes a local proxy instance.
|
||||
func (a *Agent) RemoveProxy(proxyID string, persist bool) error {
|
||||
a.proxyLock.Lock()
|
||||
defer a.proxyLock.Unlock()
|
||||
return a.removeProxyLocked(proxyID, persist)
|
||||
}
|
||||
|
||||
// verifyProxyToken takes a token and attempts to verify it against the
|
||||
// targetService name. If targetProxy is specified, then the local proxy token
|
||||
// must exactly match the given proxy ID. cert, config, etc.).
|
||||
|
@ -2782,9 +2823,67 @@ func (a *Agent) unloadChecks() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// loadPersistedProxies will load connect proxy definitions from their
|
||||
// persisted state on disk and return a slice of them
|
||||
//
|
||||
// This does not add them to the local
|
||||
func (a *Agent) loadPersistedProxies() (map[string]persistedProxy, error) {
|
||||
persistedProxies := make(map[string]persistedProxy)
|
||||
|
||||
proxyDir := filepath.Join(a.config.DataDir, proxyDir)
|
||||
files, err := ioutil.ReadDir(proxyDir)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("Failed reading proxies dir %q: %s", proxyDir, err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, fi := range files {
|
||||
// Skip all dirs
|
||||
if fi.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip all partially written temporary files
|
||||
if strings.HasSuffix(fi.Name(), "tmp") {
|
||||
return nil, fmt.Errorf("Ignoring temporary proxy file %v", fi.Name())
|
||||
}
|
||||
|
||||
// Open the file for reading
|
||||
file := filepath.Join(proxyDir, fi.Name())
|
||||
fh, err := os.Open(file)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed opening proxy file %q: %s", file, err)
|
||||
}
|
||||
|
||||
// Read the contents into a buffer
|
||||
buf, err := ioutil.ReadAll(fh)
|
||||
fh.Close()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed reading proxy file %q: %s", file, err)
|
||||
}
|
||||
|
||||
// Try decoding the proxy definition
|
||||
var p persistedProxy
|
||||
if err := json.Unmarshal(buf, &p); err != nil {
|
||||
return nil, fmt.Errorf("Failed decoding proxy file %q: %s", file, err)
|
||||
}
|
||||
svcID := p.Proxy.TargetServiceID
|
||||
|
||||
persistedProxies[svcID] = p
|
||||
}
|
||||
|
||||
return persistedProxies, nil
|
||||
}
|
||||
|
||||
// loadProxies will load connect proxy definitions from configuration and
|
||||
// persisted definitions on disk, and load them into the local agent.
|
||||
func (a *Agent) loadProxies(conf *config.RuntimeConfig) error {
|
||||
a.proxyLock.Lock()
|
||||
defer a.proxyLock.Unlock()
|
||||
|
||||
persistedProxies, persistenceErr := a.loadPersistedProxies()
|
||||
|
||||
for _, svc := range conf.Services {
|
||||
if svc.Connect != nil {
|
||||
proxy, err := svc.ConnectManagedProxy()
|
||||
|
@ -2794,78 +2893,46 @@ func (a *Agent) loadProxies(conf *config.RuntimeConfig) error {
|
|||
if proxy == nil {
|
||||
continue
|
||||
}
|
||||
if err := a.AddProxy(proxy, false, ""); err != nil {
|
||||
restoredToken := ""
|
||||
if persisted, ok := persistedProxies[proxy.TargetServiceID]; ok {
|
||||
restoredToken = persisted.ProxyToken
|
||||
}
|
||||
|
||||
if err := a.addProxyLocked(proxy, true, true, restoredToken); err != nil {
|
||||
return fmt.Errorf("failed adding proxy: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Load any persisted proxies
|
||||
proxyDir := filepath.Join(a.config.DataDir, proxyDir)
|
||||
files, err := ioutil.ReadDir(proxyDir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Failed reading proxies dir %q: %s", proxyDir, err)
|
||||
}
|
||||
for _, fi := range files {
|
||||
// Skip all dirs
|
||||
if fi.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip all partially written temporary files
|
||||
if strings.HasSuffix(fi.Name(), "tmp") {
|
||||
a.logger.Printf("[WARN] agent: Ignoring temporary proxy file %v", fi.Name())
|
||||
continue
|
||||
}
|
||||
|
||||
// Open the file for reading
|
||||
file := filepath.Join(proxyDir, fi.Name())
|
||||
fh, err := os.Open(file)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed opening proxy file %q: %s", file, err)
|
||||
}
|
||||
|
||||
// Read the contents into a buffer
|
||||
buf, err := ioutil.ReadAll(fh)
|
||||
fh.Close()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed reading proxy file %q: %s", file, err)
|
||||
}
|
||||
|
||||
// Try decoding the proxy definition
|
||||
var p persistedProxy
|
||||
if err := json.Unmarshal(buf, &p); err != nil {
|
||||
a.logger.Printf("[ERR] agent: Failed decoding proxy file %q: %s", file, err)
|
||||
continue
|
||||
}
|
||||
proxyID := p.Proxy.ProxyService.ID
|
||||
|
||||
if a.State.Proxy(proxyID) != nil {
|
||||
// Purge previously persisted proxy. This allows config to be preferred
|
||||
// over services persisted from the API.
|
||||
a.logger.Printf("[DEBUG] agent: proxy %q exists, not restoring from %q",
|
||||
proxyID, file)
|
||||
for _, persisted := range persistedProxies {
|
||||
proxyID := persisted.Proxy.ProxyService.ID
|
||||
if persisted.FromFile && a.State.Proxy(proxyID) == nil {
|
||||
// Purge proxies that were configured previously but are no longer in the config
|
||||
a.logger.Printf("[DEBUG] agent: purging stale persisted proxy %q", proxyID)
|
||||
if err := a.purgeProxy(proxyID); err != nil {
|
||||
return fmt.Errorf("failed purging proxy %q: %s", proxyID, err)
|
||||
return fmt.Errorf("failed purging proxy %q: %v", proxyID, err)
|
||||
}
|
||||
} else {
|
||||
a.logger.Printf("[DEBUG] agent: restored proxy definition %q from %q",
|
||||
proxyID, file)
|
||||
if err := a.AddProxy(p.Proxy, false, p.ProxyToken); err != nil {
|
||||
return fmt.Errorf("failed adding proxy %q: %s", proxyID, err)
|
||||
} else if !persisted.FromFile {
|
||||
if a.State.Proxy(proxyID) == nil {
|
||||
a.logger.Printf("[DEBUG] agent: restored proxy definition %q", proxyID)
|
||||
if err := a.addProxyLocked(persisted.Proxy, false, false, persisted.ProxyToken); err != nil {
|
||||
return fmt.Errorf("failed adding proxy %q: %v", proxyID, err)
|
||||
}
|
||||
} else {
|
||||
a.logger.Printf("[WARN] agent: proxy definition %q was overwritten by a proxy definition within a config file", proxyID)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
return persistenceErr
|
||||
}
|
||||
|
||||
// unloadProxies will deregister all proxies known to the local agent.
|
||||
func (a *Agent) unloadProxies() error {
|
||||
a.proxyLock.Lock()
|
||||
defer a.proxyLock.Unlock()
|
||||
for id := range a.State.Proxies() {
|
||||
if err := a.RemoveProxy(id, false); err != nil {
|
||||
if err := a.removeProxyLocked(id, false); err != nil {
|
||||
return fmt.Errorf("Failed deregistering proxy '%s': %s", id, err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -637,7 +637,7 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
|
|||
}
|
||||
// Add proxy (which will add proxy service so do it before we trigger sync)
|
||||
if proxy != nil {
|
||||
if err := s.agent.AddProxy(proxy, true, ""); err != nil {
|
||||
if err := s.agent.AddProxy(proxy, true, false, ""); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/checks"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
|
@ -1383,12 +1384,12 @@ func TestAgent_PersistProxy(t *testing.T) {
|
|||
file := filepath.Join(a.Config.DataDir, proxyDir, stringHash("redis-proxy"))
|
||||
|
||||
// Proxy is not persisted unless requested
|
||||
require.NoError(a.AddProxy(proxy, false, ""))
|
||||
require.NoError(a.AddProxy(proxy, false, false, ""))
|
||||
_, err := os.Stat(file)
|
||||
require.Error(err, "proxy should not be persisted")
|
||||
|
||||
// Proxy is persisted if requested
|
||||
require.NoError(a.AddProxy(proxy, true, ""))
|
||||
require.NoError(a.AddProxy(proxy, true, false, ""))
|
||||
_, err = os.Stat(file)
|
||||
require.NoError(err, "proxy should be persisted")
|
||||
|
||||
|
@ -1404,7 +1405,7 @@ func TestAgent_PersistProxy(t *testing.T) {
|
|||
proxy.Config = map[string]interface{}{
|
||||
"foo": "bar",
|
||||
}
|
||||
require.NoError(a.AddProxy(proxy, true, ""))
|
||||
require.NoError(a.AddProxy(proxy, true, false, ""))
|
||||
|
||||
content, err = ioutil.ReadFile(file)
|
||||
require.NoError(err)
|
||||
|
@ -1451,7 +1452,7 @@ func TestAgent_PurgeProxy(t *testing.T) {
|
|||
Command: []string{"/bin/sleep", "3600"},
|
||||
}
|
||||
proxyID := "redis-proxy"
|
||||
require.NoError(a.AddProxy(proxy, true, ""))
|
||||
require.NoError(a.AddProxy(proxy, true, false, ""))
|
||||
|
||||
file := filepath.Join(a.Config.DataDir, proxyDir, stringHash("redis-proxy"))
|
||||
|
||||
|
@ -1461,7 +1462,7 @@ func TestAgent_PurgeProxy(t *testing.T) {
|
|||
require.NoError(err, "should not be removed")
|
||||
|
||||
// Re-add the proxy
|
||||
require.NoError(a.AddProxy(proxy, true, ""))
|
||||
require.NoError(a.AddProxy(proxy, true, false, ""))
|
||||
|
||||
// Removed
|
||||
require.NoError(a.RemoveProxy(proxyID, true))
|
||||
|
@ -1499,7 +1500,7 @@ func TestAgent_PurgeProxyOnDuplicate(t *testing.T) {
|
|||
Command: []string{"/bin/sleep", "3600"},
|
||||
}
|
||||
proxyID := "redis-proxy"
|
||||
require.NoError(a.AddProxy(proxy, true, ""))
|
||||
require.NoError(a.AddProxy(proxy, true, false, ""))
|
||||
|
||||
a.Shutdown()
|
||||
|
||||
|
@ -2753,7 +2754,7 @@ func TestAgent_AddProxy(t *testing.T) {
|
|||
}
|
||||
require.NoError(a.AddService(reg, nil, false, ""))
|
||||
|
||||
err := a.AddProxy(tt.proxy, false, "")
|
||||
err := a.AddProxy(tt.proxy, false, false, "")
|
||||
if tt.wantErr {
|
||||
require.Error(err)
|
||||
return
|
||||
|
@ -2813,7 +2814,7 @@ func TestAgent_RemoveProxy(t *testing.T) {
|
|||
ExecMode: structs.ProxyExecModeDaemon,
|
||||
Command: []string{"foo"},
|
||||
}
|
||||
require.NoError(a.AddProxy(pReg, false, ""))
|
||||
require.NoError(a.AddProxy(pReg, false, false, ""))
|
||||
|
||||
// Test the ID was created as we expect.
|
||||
gotProxy := a.State.Proxy("web-proxy")
|
||||
|
@ -2830,3 +2831,61 @@ func TestAgent_RemoveProxy(t *testing.T) {
|
|||
err = a.RemoveProxy("foobar", false)
|
||||
require.Error(err)
|
||||
}
|
||||
|
||||
func TestAgent_ReLoadProxiesFromConfig(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := NewTestAgent(t.Name(),
|
||||
`node_name = "node1"
|
||||
`)
|
||||
defer a.Shutdown()
|
||||
require := require.New(t)
|
||||
|
||||
// Register a target service we can use
|
||||
reg := &structs.NodeService{
|
||||
Service: "web",
|
||||
Port: 8080,
|
||||
}
|
||||
require.NoError(a.AddService(reg, nil, false, ""))
|
||||
|
||||
proxies := a.State.Proxies()
|
||||
require.Len(proxies, 0)
|
||||
|
||||
config := config.RuntimeConfig{
|
||||
Services: []*structs.ServiceDefinition{
|
||||
&structs.ServiceDefinition{
|
||||
Name: "web",
|
||||
Connect: &structs.ServiceConnect{
|
||||
Native: false,
|
||||
Proxy: &structs.ServiceDefinitionConnectProxy{},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
require.NoError(a.loadProxies(&config))
|
||||
|
||||
// ensure we loaded the proxy
|
||||
proxies = a.State.Proxies()
|
||||
require.Len(proxies, 1)
|
||||
|
||||
// store the auto-generated token
|
||||
ptok := ""
|
||||
pid := ""
|
||||
for id := range proxies {
|
||||
pid = id
|
||||
ptok = proxies[id].ProxyToken
|
||||
break
|
||||
}
|
||||
|
||||
// reload the proxies and ensure the proxy token is the same
|
||||
require.NoError(a.unloadProxies())
|
||||
require.NoError(a.loadProxies(&config))
|
||||
require.Len(proxies, 1)
|
||||
require.Equal(ptok, proxies[pid].ProxyToken)
|
||||
|
||||
// make sure when the config goes away so does the proxy
|
||||
require.NoError(a.unloadProxies())
|
||||
// a.config contains no services or proxies
|
||||
require.NoError(a.loadProxies(a.config))
|
||||
require.Len(proxies, 0)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue