agent/local: add Notify mechanism for proxy changes

pull/4275/head
Mitchell Hashimoto 2018-04-30 21:12:55 -07:00
parent 476ea7b04a
commit 2bd39a84a6
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
3 changed files with 98 additions and 79 deletions

View File

@ -1,46 +0,0 @@
package local
import (
"fmt"
"os/exec"
"github.com/hashicorp/consul/agent/proxy"
"github.com/hashicorp/consul/agent/structs"
)
// newProxyProcess returns the proxy.Proxy for the given ManagedProxy
// state entry. proxy.Proxy is the actual managed process. The returned value
// is the initialized struct but isn't explicitly started.
func (s *State) newProxyProcess(p *structs.ConnectManagedProxy, pToken string) (proxy.Proxy, error) {
switch p.ExecMode {
case structs.ProxyExecModeDaemon:
command := p.Command
if len(command) == 0 {
command = p.CommandDefault
}
// This should never happen since validation should happen upstream
// but verify it because the alternative is to panic below.
if len(command) == 0 {
return nil, fmt.Errorf("daemon mode managed proxy requires command")
}
// Build the command to execute.
var cmd exec.Cmd
cmd.Path = command[0]
cmd.Args = command[1:]
// Build the daemon structure
return &proxy.Daemon{
Command: &cmd,
ProxyToken: pToken,
Logger: s.logger,
}, nil
case structs.ProxyExecModeScript:
return &proxy.Noop{}, nil
default:
return nil, fmt.Errorf("unsupported managed proxy type: %q", p.ExecMode)
}
}

View File

@ -14,7 +14,6 @@ import (
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/proxy"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
@ -127,9 +126,6 @@ type ManagedProxy struct {
// use service-scoped ACL tokens distributed externally. // use service-scoped ACL tokens distributed externally.
ProxyToken string ProxyToken string
// ProxyProcess is the managed proxy itself that is running.
ProxyProcess proxy.Proxy
// WatchCh is a close-only chan that is closed when the proxy is removed or // WatchCh is a close-only chan that is closed when the proxy is removed or
// updated. // updated.
WatchCh chan struct{} WatchCh chan struct{}
@ -187,19 +183,24 @@ type State struct {
// registration) do not appear here as the agent doesn't need to manage their // registration) do not appear here as the agent doesn't need to manage their
// process nor config. The _do_ still exist in services above though as // process nor config. The _do_ still exist in services above though as
// services with Kind == connect-proxy. // services with Kind == connect-proxy.
managedProxies map[string]*ManagedProxy //
// managedProxyHandlers is a map of registered channel listeners that
// are sent a message each time a proxy changes via Add or RemoveProxy.
managedProxies map[string]*ManagedProxy
managedProxyHandlers map[chan<- struct{}]struct{}
} }
// NewState creates a new local state for the agent. // NewState creates a new local state for the agent.
func NewState(c Config, lg *log.Logger, tokens *token.Store) *State { func NewState(c Config, lg *log.Logger, tokens *token.Store) *State {
l := &State{ l := &State{
config: c, config: c,
logger: lg, logger: lg,
services: make(map[string]*ServiceState), services: make(map[string]*ServiceState),
checks: make(map[types.CheckID]*CheckState), checks: make(map[types.CheckID]*CheckState),
metadata: make(map[string]string), metadata: make(map[string]string),
tokens: tokens, tokens: tokens,
managedProxies: make(map[string]*ManagedProxy), managedProxies: make(map[string]*ManagedProxy),
managedProxyHandlers: make(map[chan<- struct{}]struct{}),
} }
l.SetDiscardCheckOutput(c.DiscardCheckOutput) l.SetDiscardCheckOutput(c.DiscardCheckOutput)
return l return l
@ -577,22 +578,22 @@ func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState {
// AddProxy returns the newly added proxy, any replaced proxy, and an error. // AddProxy returns the newly added proxy, any replaced proxy, and an error.
// The second return value (replaced proxy) can be used to determine if // The second return value (replaced proxy) can be used to determine if
// the process needs to be updated or not. // the process needs to be updated or not.
func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*ManagedProxy, *ManagedProxy, error) { func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*ManagedProxy, error) {
if proxy == nil { if proxy == nil {
return nil, nil, fmt.Errorf("no proxy") return nil, fmt.Errorf("no proxy")
} }
// Lookup the local service // Lookup the local service
target := l.Service(proxy.TargetServiceID) target := l.Service(proxy.TargetServiceID)
if target == nil { if target == nil {
return nil, nil, fmt.Errorf("target service ID %s not registered", return nil, fmt.Errorf("target service ID %s not registered",
proxy.TargetServiceID) proxy.TargetServiceID)
} }
// Get bind info from config // Get bind info from config
cfg, err := proxy.ParseConfig() cfg, err := proxy.ParseConfig()
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
// Construct almost all of the NodeService that needs to be registered by the // Construct almost all of the NodeService that needs to be registered by the
@ -608,15 +609,7 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*Man
pToken, err := uuid.GenerateUUID() pToken, err := uuid.GenerateUUID()
if err != nil { if err != nil {
return nil, nil, err return nil, err
}
// Initialize the managed proxy process. This doesn't start anything,
// it only sets up the structures we'll use. To start the proxy, the
// caller should call Proxy and use the returned ManagedProxy instance.
proxyProcess, err := l.newProxyProcess(proxy, pToken)
if err != nil {
return nil, nil, err
} }
// Lock now. We can't lock earlier as l.Service would deadlock and shouldn't // Lock now. We can't lock earlier as l.Service would deadlock and shouldn't
@ -650,7 +643,7 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*Man
} }
// If no ports left (or auto ports disabled) fail // If no ports left (or auto ports disabled) fail
if svc.Port < 1 { if svc.Port < 1 {
return nil, nil, fmt.Errorf("no port provided for proxy bind_port and none "+ return nil, fmt.Errorf("no port provided for proxy bind_port and none "+
" left in the allocated range [%d, %d]", l.config.ProxyBindMinPort, " left in the allocated range [%d, %d]", l.config.ProxyBindMinPort,
l.config.ProxyBindMaxPort) l.config.ProxyBindMaxPort)
} }
@ -658,8 +651,7 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*Man
proxy.ProxyService = svc proxy.ProxyService = svc
// All set, add the proxy and return the service // All set, add the proxy and return the service
old, ok := l.managedProxies[svc.ID] if old, ok := l.managedProxies[svc.ID]; ok {
if ok {
// Notify watchers of the existing proxy config that it's changing. Note // Notify watchers of the existing proxy config that it's changing. Note
// this is safe here even before the map is updated since we still hold the // this is safe here even before the map is updated since we still hold the
// state lock and the watcher can't re-read the new config until we return // state lock and the watcher can't re-read the new config until we return
@ -667,14 +659,22 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*Man
close(old.WatchCh) close(old.WatchCh)
} }
l.managedProxies[svc.ID] = &ManagedProxy{ l.managedProxies[svc.ID] = &ManagedProxy{
Proxy: proxy, Proxy: proxy,
ProxyToken: pToken, ProxyToken: pToken,
ProxyProcess: proxyProcess, WatchCh: make(chan struct{}),
WatchCh: make(chan struct{}), }
// Notify
for ch := range l.managedProxyHandlers {
// Do not block
select {
case ch <- struct{}{}:
default:
}
} }
// No need to trigger sync as proxy state is local only. // No need to trigger sync as proxy state is local only.
return l.managedProxies[svc.ID], old, nil return l.managedProxies[svc.ID], nil
} }
// RemoveProxy is used to remove a proxy entry from the local state. // RemoveProxy is used to remove a proxy entry from the local state.
@ -692,6 +692,15 @@ func (l *State) RemoveProxy(id string) (*ManagedProxy, error) {
// Notify watchers of the existing proxy config that it's changed. // Notify watchers of the existing proxy config that it's changed.
close(p.WatchCh) close(p.WatchCh)
// Notify
for ch := range l.managedProxyHandlers {
// Do not block
select {
case ch <- struct{}{}:
default:
}
}
// No need to trigger sync as proxy state is local only. // No need to trigger sync as proxy state is local only.
return p, nil return p, nil
} }
@ -715,6 +724,27 @@ func (l *State) Proxies() map[string]*ManagedProxy {
return m return m
} }
// NotifyProxy will register a channel to receive messages when the
// configuration or set of proxies changes. This will not block on
// channel send so ensure the channel has a large enough buffer.
//
// NOTE(mitchellh): This could be more generalized but for my use case I
// only needed proxy events. In the future if it were to be generalized I
// would add a new Notify method and remove the proxy-specific ones.
func (l *State) NotifyProxy(ch chan<- struct{}) {
l.Lock()
defer l.Unlock()
l.managedProxyHandlers[ch] = struct{}{}
}
// StopNotifyProxy will deregister a channel receiving proxy notifications.
// Pair this with all calls to NotifyProxy to clean up state.
func (l *State) StopNotifyProxy(ch chan<- struct{}) {
l.Lock()
defer l.Unlock()
delete(l.managedProxyHandlers, ch)
}
// Metadata returns the local node metadata fields that the // Metadata returns the local node metadata fields that the
// agent is aware of and are being kept in sync with the server // agent is aware of and are being kept in sync with the server
func (l *State) Metadata() map[string]string { func (l *State) Metadata() map[string]string {

View File

@ -1737,6 +1737,13 @@ func TestStateProxyManagement(t *testing.T) {
assert.Equal(svc.Port, svcDup.Port) assert.Equal(svc.Port, svcDup.Port)
} }
// Let's register a notifier now
notifyCh := make(chan struct{}, 1)
state.NotifyProxy(notifyCh)
defer state.StopNotifyProxy(notifyCh)
assert.Empty(notifyCh)
drainCh(notifyCh)
// Second proxy should claim other port // Second proxy should claim other port
p2 := p1 p2 := p1
p2.TargetServiceID = "cache" p2.TargetServiceID = "cache"
@ -1746,6 +1753,10 @@ func TestStateProxyManagement(t *testing.T) {
assert.Contains([]int{20000, 20001}, svc2.Port) assert.Contains([]int{20000, 20001}, svc2.Port)
assert.NotEqual(svc.Port, svc2.Port) assert.NotEqual(svc.Port, svc2.Port)
// Should have a notification
assert.NotEmpty(notifyCh)
drainCh(notifyCh)
// Store this for later // Store this for later
p2token := state.Proxy(svc2.ID).ProxyToken p2token := state.Proxy(svc2.ID).ProxyToken
@ -1755,6 +1766,9 @@ func TestStateProxyManagement(t *testing.T) {
_, err = state.AddProxy(&p3, "fake-token") _, err = state.AddProxy(&p3, "fake-token")
require.Error(err) require.Error(err)
// Should have a notification but we'll do nothing so that the next
// receive should block (we set cap == 1 above)
// But if we set a port explicitly it should be OK // But if we set a port explicitly it should be OK
p3.Config = map[string]interface{}{ p3.Config = map[string]interface{}{
"bind_port": 1234, "bind_port": 1234,
@ -1766,6 +1780,10 @@ func TestStateProxyManagement(t *testing.T) {
require.Equal("0.0.0.0", svc3.Address) require.Equal("0.0.0.0", svc3.Address)
require.Equal(1234, svc3.Port) require.Equal(1234, svc3.Port)
// Should have a notification
assert.NotEmpty(notifyCh)
drainCh(notifyCh)
// Update config of an already registered proxy should work // Update config of an already registered proxy should work
p3updated := p3 p3updated := p3
p3updated.Config["foo"] = "bar" p3updated.Config["foo"] = "bar"
@ -1785,10 +1803,16 @@ func TestStateProxyManagement(t *testing.T) {
assert.False(ws.Watch(time.After(500*time.Millisecond)), assert.False(ws.Watch(time.After(500*time.Millisecond)),
"watch should have fired so ws.Watch should not timeout") "watch should have fired so ws.Watch should not timeout")
drainCh(notifyCh)
// Remove one of the auto-assigned proxies // Remove one of the auto-assigned proxies
_, err = state.RemoveProxy(svc2.ID) _, err = state.RemoveProxy(svc2.ID)
require.NoError(err) require.NoError(err)
// Should have a notification
assert.NotEmpty(notifyCh)
drainCh(notifyCh)
// Should be able to create a new proxy for that service with the port (it // Should be able to create a new proxy for that service with the port (it
// should have been "freed"). // should have been "freed").
p4 := p2 p4 := p2
@ -1829,3 +1853,14 @@ func TestStateProxyManagement(t *testing.T) {
} }
} }
} }
// drainCh drains a channel by reading messages until it would block.
func drainCh(ch chan struct{}) {
for {
select {
case <-ch:
default:
return
}
}
}