From e6071051cf210fb49221ef9a429e78ffa2dfb886 Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Mon, 16 Apr 2018 16:00:20 +0100 Subject: [PATCH] Added connect proxy config and local agent state setup on boot. --- agent/agent.go | 79 +++++++++++++++ agent/agent_test.go | 102 +++++++++++++++++++ agent/config/builder.go | 82 +++++++++++++++ agent/config/config.go | 43 ++++++++ agent/config/runtime.go | 35 +++++++ agent/config/runtime_test.go | 79 ++++++++++++++- agent/local/state.go | 181 ++++++++++++++++++++++++++++++++-- agent/local/state_test.go | 129 ++++++++++++++++++++++++ agent/structs/connect.go | 76 ++++++++++++++ agent/structs/connect_test.go | 115 +++++++++++++++++++++ 10 files changed, 911 insertions(+), 10 deletions(-) create mode 100644 agent/structs/connect_test.go diff --git a/agent/agent.go b/agent/agent.go index 4410ff2935..b988029ce2 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -246,6 +246,8 @@ func LocalConfig(cfg *config.RuntimeConfig) local.Config { NodeID: cfg.NodeID, NodeName: cfg.NodeName, TaggedAddresses: map[string]string{}, + ProxyBindMinPort: cfg.ConnectProxyBindMinPort, + ProxyBindMaxPort: cfg.ConnectProxyBindMaxPort, } for k, v := range cfg.TaggedAddresses { lc.TaggedAddresses[k] = v @@ -328,6 +330,9 @@ func (a *Agent) Start() error { if err := a.loadServices(c); err != nil { return err } + if err := a.loadProxies(c); err != nil { + return err + } if err := a.loadChecks(c); err != nil { return err } @@ -1973,6 +1978,58 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error { return nil } +// AddProxy adds a new local Connect Proxy instance to be managed by the agent. +// +// It REQUIRES that the service that is being proxied is already present in the +// local state. Note that this is only used for agent-managed proxies so we can +// ensure that we always make this true. For externally managed and registered +// proxies we explicitly allow the proxy to be registered first to make +// bootstrap ordering of a new service simpler but the same is not true here +// since this is only ever called when setting up a _managed_ proxy which was +// registered as part of a service registration either from config or HTTP API +// call. +func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool) error { + // Lookup the target service token in state if there is one. + token := a.State.ServiceToken(proxy.TargetServiceID) + + // Add the proxy to local state first since we may need to assign a port which + // needs to be coordinate under state lock. AddProxy will generate the + // NodeService for the proxy populated with the allocated (or configured) port + // and an ID, but it doesn't add it to the agent directly since that could + // deadlock and we may need to coordinate adding it and persisting etc. + proxyService, err := a.State.AddProxy(proxy, token) + if err != nil { + return err + } + + // TODO(banks): register proxy health checks. + err = a.AddService(proxyService, nil, persist, token) + if err != nil { + // Remove the state too + a.State.RemoveProxy(proxyService.ID) + return err + } + + // TODO(banks): persist some of the local proxy state (not the _proxy_ token). + return nil +} + +// RemoveProxy stops and removes a local proxy instance. +func (a *Agent) RemoveProxy(proxyID string, persist bool) error { + // Validate proxyID + if proxyID == "" { + return fmt.Errorf("proxyID missing") + } + + if err := a.State.RemoveProxy(proxyID); err != nil { + return err + } + + // TODO(banks): unpersist proxy + + return nil +} + func (a *Agent) cancelCheckMonitors(checkID types.CheckID) { // Stop any monitors delete(a.checkReapAfter, checkID) @@ -2366,6 +2423,25 @@ func (a *Agent) unloadChecks() error { return nil } +// loadProxies will load connect proxy definitions from configuration and +// persisted definitions on disk, and load them into the local agent. +func (a *Agent) loadProxies(conf *config.RuntimeConfig) error { + for _, proxy := range conf.ConnectProxies { + if err := a.AddProxy(proxy, false); err != nil { + return fmt.Errorf("failed adding proxy: %s", err) + } + } + + // TODO(banks): persist proxy state and re-load it here? + return nil +} + +// unloadProxies will deregister all proxies known to the local agent. +func (a *Agent) unloadProxies() error { + // TODO(banks): implement me + return nil +} + // snapshotCheckState is used to snapshot the current state of the health // checks. This is done before we reload our checks, so that we can properly // restore into the same state. @@ -2514,6 +2590,9 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error { if err := a.loadServices(newCfg); err != nil { return fmt.Errorf("Failed reloading services: %s", err) } + if err := a.loadProxies(newCfg); err != nil { + return fmt.Errorf("Failed reloading proxies: %s", err) + } if err := a.loadChecks(newCfg); err != nil { return fmt.Errorf("Failed reloading checks: %s", err) } diff --git a/agent/agent_test.go b/agent/agent_test.go index df1593bd90..2ee42d7db6 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -15,6 +15,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/structs" @@ -2235,3 +2237,103 @@ func TestAgent_reloadWatchesHTTPS(t *testing.T) { t.Fatalf("bad: %s", err) } } + +func TestAgent_AddProxy(t *testing.T) { + t.Parallel() + a := NewTestAgent(t.Name(), ` + node_name = "node1" + `) + defer a.Shutdown() + + // Register a target service we can use + reg := &structs.NodeService{ + Service: "web", + Port: 8080, + } + require.NoError(t, a.AddService(reg, nil, false, "")) + + tests := []struct { + desc string + proxy *structs.ConnectManagedProxy + wantErr bool + }{ + { + desc: "basic proxy adding, unregistered service", + proxy: &structs.ConnectManagedProxy{ + ExecMode: structs.ProxyExecModeDaemon, + Command: "consul connect proxy", + Config: map[string]interface{}{ + "foo": "bar", + }, + TargetServiceID: "db", // non-existent service. + }, + // Target service must be registered. + wantErr: true, + }, + { + desc: "basic proxy adding, unregistered service", + proxy: &structs.ConnectManagedProxy{ + ExecMode: structs.ProxyExecModeDaemon, + Command: "consul connect proxy", + Config: map[string]interface{}{ + "foo": "bar", + }, + TargetServiceID: "web", + }, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + require := require.New(t) + + err := a.AddProxy(tt.proxy, false) + if tt.wantErr { + require.Error(err) + return + } + require.NoError(err) + + // Test the ID was created as we expect. + got := a.State.Proxy("web-proxy") + require.Equal(tt.proxy, got) + }) + } +} + +func TestAgent_RemoveProxy(t *testing.T) { + t.Parallel() + a := NewTestAgent(t.Name(), ` + node_name = "node1" + `) + defer a.Shutdown() + require := require.New(t) + + // Register a target service we can use + reg := &structs.NodeService{ + Service: "web", + Port: 8080, + } + require.NoError(a.AddService(reg, nil, false, "")) + + // Add a proxy for web + pReg := &structs.ConnectManagedProxy{ + TargetServiceID: "web", + } + require.NoError(a.AddProxy(pReg, false)) + + // Test the ID was created as we expect. + gotProxy := a.State.Proxy("web-proxy") + require.Equal(pReg, gotProxy) + + err := a.RemoveProxy("web-proxy", false) + require.NoError(err) + + gotProxy = a.State.Proxy("web-proxy") + require.Nil(gotProxy) + + // Removing invalid proxy should be an error + err = a.RemoveProxy("foobar", false) + require.Error(err) +} diff --git a/agent/config/builder.go b/agent/config/builder.go index 6048dab929..a6338ae147 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -322,8 +322,15 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { } var services []*structs.ServiceDefinition + var proxies []*structs.ConnectManagedProxy for _, service := range c.Services { services = append(services, b.serviceVal(&service)) + // Register any connect proxies requested + if proxy := b.connectManagedProxyVal(&service); proxy != nil { + proxies = append(proxies, proxy) + } + // TODO(banks): support connect-native registrations (v.Connect.Enabled == + // true) } if c.Service != nil { services = append(services, b.serviceVal(c.Service)) @@ -520,6 +527,9 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { consulRaftHeartbeatTimeout := b.durationVal("consul.raft.heartbeat_timeout", c.Consul.Raft.HeartbeatTimeout) * time.Duration(performanceRaftMultiplier) consulRaftLeaderLeaseTimeout := b.durationVal("consul.raft.leader_lease_timeout", c.Consul.Raft.LeaderLeaseTimeout) * time.Duration(performanceRaftMultiplier) + // Connect proxy defaults. + proxyBindMinPort, proxyBindMaxPort := b.connectProxyPortRange(c.Connect) + // ---------------------------------------------------------------- // build runtime config // @@ -638,6 +648,9 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { CheckUpdateInterval: b.durationVal("check_update_interval", c.CheckUpdateInterval), Checks: checks, ClientAddrs: clientAddrs, + ConnectProxies: proxies, + ConnectProxyBindMinPort: proxyBindMinPort, + ConnectProxyBindMaxPort: proxyBindMaxPort, DataDir: b.stringVal(c.DataDir), Datacenter: strings.ToLower(b.stringVal(c.Datacenter)), DevMode: b.boolVal(b.Flags.DevMode), @@ -1010,6 +1023,75 @@ func (b *Builder) serviceVal(v *ServiceDefinition) *structs.ServiceDefinition { } } +func (b *Builder) connectManagedProxyVal(v *ServiceDefinition) *structs.ConnectManagedProxy { + if v.Connect == nil || v.Connect.Proxy == nil { + return nil + } + + p := v.Connect.Proxy + + targetID := b.stringVal(v.ID) + if targetID == "" { + targetID = b.stringVal(v.Name) + } + + execMode := structs.ProxyExecModeDaemon + if p.ExecMode != nil { + switch *p.ExecMode { + case "daemon": + execMode = structs.ProxyExecModeDaemon + case "script": + execMode = structs.ProxyExecModeScript + default: + b.err = multierror.Append(fmt.Errorf( + "service[%s]: invalid connect proxy exec_mode: %s", targetID, + *p.ExecMode)) + return nil + } + } + + return &structs.ConnectManagedProxy{ + ExecMode: execMode, + Command: b.stringVal(p.Command), + Config: p.Config, + // ProxyService will be setup when the agent registers the configured + // proxies and starts them etc. We could do it here but we may need to do + // things like probe the OS for a free port etc. And we have enough info to + // resolve all this later. + ProxyService: nil, + TargetServiceID: targetID, + } +} + +func (b *Builder) connectProxyPortRange(v *Connect) (int, int) { + // Choose this default range just because. There are zero "safe" ranges that + // don't have something somewhere that uses them which is why this is + // configurable. We rely on the host not having any of these ports for non + // agent managed proxies. I went with 20k because I know of at least one + // super-common server memcached that defaults to the 10k range. + start := 20000 + end := 20256 // 256 proxies on a host is enough for anyone ;) + + if v == nil || v.ProxyDefaults == nil { + return start, end + } + + min, max := v.ProxyDefaults.BindMinPort, v.ProxyDefaults.BindMaxPort + if min == nil && max == nil { + return start, end + } + + // If either was set show a warning if the overall range was invalid + if min == nil || max == nil || *max < *min { + b.warn("Connect proxy_defaults bind_min_port and bind_max_port must both "+ + "be set with max >= min. To disable automatic port allocation set both "+ + "to 0. Using default range %d..%d.", start, end) + return start, end + } + + return *min, *max +} + func (b *Builder) boolVal(v *bool) bool { if v == nil { return false diff --git a/agent/config/config.go b/agent/config/config.go index 79d274d0dd..f652c90768 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -159,6 +159,7 @@ type Config struct { CheckUpdateInterval *string `json:"check_update_interval,omitempty" hcl:"check_update_interval" mapstructure:"check_update_interval"` Checks []CheckDefinition `json:"checks,omitempty" hcl:"checks" mapstructure:"checks"` ClientAddr *string `json:"client_addr,omitempty" hcl:"client_addr" mapstructure:"client_addr"` + Connect *Connect `json:"connect,omitempty" hcl:"connect" mapstructure:"connect"` DNS DNS `json:"dns_config,omitempty" hcl:"dns_config" mapstructure:"dns_config"` DNSDomain *string `json:"domain,omitempty" hcl:"domain" mapstructure:"domain"` DNSRecursors []string `json:"recursors,omitempty" hcl:"recursors" mapstructure:"recursors"` @@ -324,6 +325,7 @@ type ServiceDefinition struct { Checks []CheckDefinition `json:"checks,omitempty" hcl:"checks" mapstructure:"checks"` Token *string `json:"token,omitempty" hcl:"token" mapstructure:"token"` EnableTagOverride *bool `json:"enable_tag_override,omitempty" hcl:"enable_tag_override" mapstructure:"enable_tag_override"` + Connect *ServiceConnect `json:"connect,omitempty" hcl:"connect" mapstructure:"connect"` } type CheckDefinition struct { @@ -349,6 +351,47 @@ type CheckDefinition struct { DeregisterCriticalServiceAfter *string `json:"deregister_critical_service_after,omitempty" hcl:"deregister_critical_service_after" mapstructure:"deregister_critical_service_after"` } +// ServiceConnect is the connect block within a service registration +type ServiceConnect struct { + // TODO(banks) add way to specify that the app is connect-native + // Proxy configures a connect proxy instance for the service + Proxy *ServiceConnectProxy `json:"proxy,omitempty" hcl:"proxy" mapstructure:"proxy"` +} + +type ServiceConnectProxy struct { + Command *string `json:"command,omitempty" hcl:"command" mapstructure:"command"` + ExecMode *string `json:"exec_mode,omitempty" hcl:"exec_mode" mapstructure:"exec_mode"` + Config map[string]interface{} `json:"config,omitempty" hcl:"config" mapstructure:"config"` +} + +// Connect is the agent-global connect configuration. +type Connect struct { + // Enabled opts the agent into connect. It should be set on all clients and + // servers in a cluster for correct connect operation. TODO(banks) review that. + Enabled bool `json:"enabled,omitempty" hcl:"enabled" mapstructure:"enabled"` + ProxyDefaults *ConnectProxyDefaults `json:"proxy_defaults,omitempty" hcl:"proxy_defaults" mapstructure:"proxy_defaults"` +} + +// ConnectProxyDefaults is the agent-global connect proxy configuration. +type ConnectProxyDefaults struct { + // BindMinPort, BindMaxPort are the inclusive lower and upper bounds on the + // port range allocated to the agent to assign to connect proxies that have no + // bind_port specified. + BindMinPort *int `json:"bind_min_port,omitempty" hcl:"bind_min_port" mapstructure:"bind_min_port"` + BindMaxPort *int `json:"bind_max_port,omitempty" hcl:"bind_max_port" mapstructure:"bind_max_port"` + // ExecMode is used where a registration doesn't include an exec_mode. + // Defaults to daemon. + ExecMode *string `json:"exec_mode,omitempty" hcl:"exec_mode" mapstructure:"exec_mode"` + // DaemonCommand is used to start proxy in exec_mode = daemon if not specified + // at registration time. + DaemonCommand *string `json:"daemon_command,omitempty" hcl:"daemon_command" mapstructure:"daemon_command"` + // ScriptCommand is used to start proxy in exec_mode = script if not specified + // at registration time. + ScriptCommand *string `json:"script_command,omitempty" hcl:"script_command" mapstructure:"script_command"` + // Config is merged into an Config specified at registration time. + Config map[string]interface{} `json:"config,omitempty" hcl:"config" mapstructure:"config"` +} + type DNS struct { AllowStale *bool `json:"allow_stale,omitempty" hcl:"allow_stale" mapstructure:"allow_stale"` ARecordLimit *int `json:"a_record_limit,omitempty" hcl:"a_record_limit" mapstructure:"a_record_limit"` diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 66e7e79e7b..55c15d14e4 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -616,6 +616,41 @@ type RuntimeConfig struct { // flag: -client string ClientAddrs []*net.IPAddr + // ConnectEnabled opts the agent into connect. It should be set on all clients + // and servers in a cluster for correct connect operation. TODO(banks) review + // that. + ConnectEnabled bool + + // ConnectProxies is a list of configured proxies taken from the "connect" + // block of service registrations. + ConnectProxies []*structs.ConnectManagedProxy + + // ConnectProxyBindMinPort is the inclusive start of the range of ports + // allocated to the agent for starting proxy listeners on where no explicit + // port is specified. + ConnectProxyBindMinPort int + + // ConnectProxyBindMaxPort is the inclusive end of the range of ports + // allocated to the agent for starting proxy listeners on where no explicit + // port is specified. + ConnectProxyBindMaxPort int + + // ConnectProxyDefaultExecMode is used where a registration doesn't include an + // exec_mode. Defaults to daemon. + ConnectProxyDefaultExecMode *string + + // ConnectProxyDefaultDaemonCommand is used to start proxy in exec_mode = + // daemon if not specified at registration time. + ConnectProxyDefaultDaemonCommand *string + + // ConnectProxyDefaultScriptCommand is used to start proxy in exec_mode = + // script if not specified at registration time. + ConnectProxyDefaultScriptCommand *string + + // ConnectProxyDefaultConfig is merged with any config specified at + // registration time to allow global control of defaults. + ConnectProxyDefaultConfig map[string]interface{} + // DNSAddrs contains the list of TCP and UDP addresses the DNS server will // bind to. If the DNS endpoint is disabled (ports.dns <= 0) the list is // empty. diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 060215c355..e990f0689e 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -2353,6 +2353,21 @@ func TestFullConfig(t *testing.T) { ], "check_update_interval": "16507s", "client_addr": "93.83.18.19", + "connect": { + "enabled": true, + "proxy_defaults": { + "bind_min_port": 2000, + "bind_max_port": 3000, + "exec_mode": "script", + "daemon_command": "consul connect proxy", + "script_command": "proxyctl.sh", + "config": { + "foo": "bar", + "connect_timeout_ms": 1000, + "pedantic_mode": true + } + } + }, "data_dir": "` + dataDir + `", "datacenter": "rzo029wg", "disable_anonymous_signature": true, @@ -2613,7 +2628,16 @@ func TestFullConfig(t *testing.T) { "ttl": "11222s", "deregister_critical_service_after": "68482s" } - ] + ], + "connect": { + "proxy": { + "exec_mode": "daemon", + "command": "awesome-proxy", + "config": { + "foo": "qux" + } + } + } } ], "session_ttl_min": "26627s", @@ -2786,6 +2810,21 @@ func TestFullConfig(t *testing.T) { ] check_update_interval = "16507s" client_addr = "93.83.18.19" + connect { + enabled = true + proxy_defaults { + bind_min_port = 2000 + bind_max_port = 3000 + exec_mode = "script" + daemon_command = "consul connect proxy" + script_command = "proxyctl.sh" + config = { + foo = "bar" + connect_timeout_ms = 1000 + pedantic_mode = true + } + } + } data_dir = "` + dataDir + `" datacenter = "rzo029wg" disable_anonymous_signature = true @@ -3047,6 +3086,15 @@ func TestFullConfig(t *testing.T) { deregister_critical_service_after = "68482s" } ] + connect { + proxy { + exec_mode = "daemon" + command = "awesome-proxy" + config = { + foo = "qux" + } + } + } } ] session_ttl_min = "26627s" @@ -3355,8 +3403,23 @@ func TestFullConfig(t *testing.T) { DeregisterCriticalServiceAfter: 13209 * time.Second, }, }, - CheckUpdateInterval: 16507 * time.Second, - ClientAddrs: []*net.IPAddr{ipAddr("93.83.18.19")}, + CheckUpdateInterval: 16507 * time.Second, + ClientAddrs: []*net.IPAddr{ipAddr("93.83.18.19")}, + ConnectProxies: []*structs.ConnectManagedProxy{ + { + ExecMode: structs.ProxyExecModeDaemon, + Command: "awesome-proxy", + Config: map[string]interface{}{ + "foo": "qux", // Overriden by service + // Note globals are not merged here but on rendering to the proxy + // endpoint. That's because proxies can be added later too so merging + // at config time is redundant if we have to do it later anyway. + }, + TargetServiceID: "MRHVMZuD", + }, + }, + ConnectProxyBindMinPort: 2000, + ConnectProxyBindMaxPort: 3000, DNSAddrs: []net.Addr{tcpAddr("93.95.95.81:7001"), udpAddr("93.95.95.81:7001")}, DNSARecordLimit: 29907, DNSAllowStale: true, @@ -4018,6 +4081,14 @@ func TestSanitize(t *testing.T) { } ], "ClientAddrs": [], + "ConnectEnabled": false, + "ConnectProxies": [], + "ConnectProxyBindMaxPort": 0, + "ConnectProxyBindMinPort": 0, + "ConnectProxyDefaultConfig": {}, + "ConnectProxyDefaultDaemonCommand": null, + "ConnectProxyDefaultExecMode": null, + "ConnectProxyDefaultScriptCommand": null, "ConsulCoordinateUpdateBatchSize": 0, "ConsulCoordinateUpdateMaxBatches": 0, "ConsulCoordinateUpdatePeriod": "15s", @@ -4150,9 +4221,11 @@ func TestSanitize(t *testing.T) { "Checks": [], "EnableTagOverride": false, "ID": "", + "Kind": "", "Meta": {}, "Name": "foo", "Port": 0, + "ProxyDestination": "", "Tags": [], "Token": "hidden" } diff --git a/agent/local/state.go b/agent/local/state.go index f19e88a766..47a0069436 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -3,6 +3,7 @@ package local import ( "fmt" "log" + "math/rand" "reflect" "strconv" "strings" @@ -10,6 +11,8 @@ import ( "sync/atomic" "time" + "github.com/hashicorp/go-uuid" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" @@ -27,6 +30,8 @@ type Config struct { NodeID types.NodeID NodeName string TaggedAddresses map[string]string + ProxyBindMinPort int + ProxyBindMaxPort int } // ServiceState describes the state of a service record. @@ -107,6 +112,21 @@ type rpc interface { RPC(method string, args interface{}, reply interface{}) error } +// ManagedProxy represents the local state for a registered proxy instance. +type ManagedProxy struct { + Proxy *structs.ConnectManagedProxy + + // ProxyToken is a special local-only security token that grants the bearer + // access to the proxy's config as well as allowing it to request certificates + // on behalf of the TargetService. Certain connect endpoints will validate + // against this token and if it matches will then use the TargetService.Token + // to actually authenticate the upstream RPC on behalf of the service. This + // token is passed securely to the proxy process via ENV vars and should never + // be exposed any other way. Unmanaged proxies will never see this and need to + // use service-scoped ACL tokens distributed externally. + ProxyToken string +} + // State is used to represent the node's services, // and checks. We use it to perform anti-entropy with the // catalog representation @@ -150,17 +170,28 @@ type State struct { // tokens contains the ACL tokens tokens *token.Store + + // managedProxies is a map of all manged connect proxies registered locally on + // this agent. This is NOT kept in sync with servers since it's agent-local + // config only. Proxy instances have separate service registrations in the + // services map above which are kept in sync via anti-entropy. Un-managed + // proxies (that registered themselves separately from the service + // registration) do not appear here as the agent doesn't need to manage their + // process nor config. The _do_ still exist in services above though as + // services with Kind == connect-proxy. + managedProxies map[string]*ManagedProxy } -// NewLocalState creates a new local state for the agent. +// NewState creates a new local state for the agent. func NewState(c Config, lg *log.Logger, tokens *token.Store) *State { l := &State{ - config: c, - logger: lg, - services: make(map[string]*ServiceState), - checks: make(map[types.CheckID]*CheckState), - metadata: make(map[string]string), - tokens: tokens, + config: c, + logger: lg, + services: make(map[string]*ServiceState), + checks: make(map[types.CheckID]*CheckState), + metadata: make(map[string]string), + tokens: tokens, + managedProxies: make(map[string]*ManagedProxy), } l.SetDiscardCheckOutput(c.DiscardCheckOutput) return l @@ -529,6 +560,142 @@ func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState { return m } +// AddProxy is used to add a connect proxy entry to the local state. This +// assumes the proxy's NodeService is already registered via Agent.AddService +// (since that has to do other book keeping). The token passed here is the ACL +// token the service used to register itself so must have write on service +// record. +func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*structs.NodeService, error) { + if proxy == nil { + return nil, fmt.Errorf("no proxy") + } + + // Lookup the local service + target := l.Service(proxy.TargetServiceID) + if target == nil { + return nil, fmt.Errorf("target service ID %s not registered", + proxy.TargetServiceID) + } + + // Get bind info from config + cfg, err := proxy.ParseConfig() + if err != nil { + return nil, err + } + + // Construct almost all of the NodeService that needs to be registered by the + // caller outside of the lock. + svc := &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: target.ID + "-proxy", + Service: target.ID + "-proxy", + ProxyDestination: target.Service, + Address: cfg.BindAddress, + Port: cfg.BindPort, + } + + pToken, err := uuid.GenerateUUID() + if err != nil { + return nil, err + } + + // Lock now. We can't lock earlier as l.Service would deadlock and shouldn't + // anyway to minimise the critical section. + l.Lock() + defer l.Unlock() + + // Allocate port if needed (min and max inclusive) + rangeLen := l.config.ProxyBindMaxPort - l.config.ProxyBindMinPort + 1 + if svc.Port < 1 && l.config.ProxyBindMinPort > 0 && rangeLen > 0 { + // This should be a really short list so don't bother optimising lookup yet. + OUTER: + for _, offset := range rand.Perm(rangeLen) { + p := l.config.ProxyBindMinPort + offset + // See if this port was already allocated to another proxy + for _, other := range l.managedProxies { + if other.Proxy.ProxyService.Port == p { + // allready taken, skip to next random pick in the range + continue OUTER + } + } + // We made it through all existing proxies without a match so claim this one + svc.Port = p + break + } + } + // If no ports left (or auto ports disabled) fail + if svc.Port < 1 { + return nil, fmt.Errorf("no port provided for proxy bind_port and none "+ + " left in the allocated range [%d, %d]", l.config.ProxyBindMinPort, + l.config.ProxyBindMaxPort) + } + + proxy.ProxyService = svc + + // All set, add the proxy and return the service + l.managedProxies[svc.ID] = &ManagedProxy{ + Proxy: proxy, + ProxyToken: pToken, + } + + // No need to trigger sync as proxy state is local only. + return svc, nil +} + +// RemoveProxy is used to remove a proxy entry from the local state. +func (l *State) RemoveProxy(id string) error { + l.Lock() + defer l.Unlock() + + p := l.managedProxies[id] + if p == nil { + return fmt.Errorf("Proxy %s does not exist", id) + } + delete(l.managedProxies, id) + + // No need to trigger sync as proxy state is local only. + return nil +} + +// Proxy returns the local proxy state. +func (l *State) Proxy(id string) *structs.ConnectManagedProxy { + l.RLock() + defer l.RUnlock() + + p := l.managedProxies[id] + if p == nil { + return nil + } + return p.Proxy +} + +// Proxies returns the locally registered proxies. +func (l *State) Proxies() map[string]*structs.ConnectManagedProxy { + l.RLock() + defer l.RUnlock() + + m := make(map[string]*structs.ConnectManagedProxy) + for id, p := range l.managedProxies { + m[id] = p.Proxy + } + return m +} + +// ProxyToken returns the local proxy token for a given proxy. Note this is not +// an ACL token so it won't fallback to using the agent-configured default ACL +// token. If the proxy doesn't exist an error is returned, otherwise the token +// is guaranteed to exist. +func (l *State) ProxyToken(id string) (string, error) { + l.RLock() + defer l.RUnlock() + + p := l.managedProxies[id] + if p == nil { + return "", fmt.Errorf("proxy %s not registered", id) + } + return p.ProxyToken, nil +} + // Metadata returns the local node metadata fields that the // agent is aware of and are being kept in sync with the server func (l *State) Metadata() map[string]string { diff --git a/agent/local/state_test.go b/agent/local/state_test.go index d0c006a952..6950cd4770 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -3,10 +3,14 @@ package local_test import ( "errors" "fmt" + "log" + "os" "reflect" "testing" "time" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/local" @@ -1664,3 +1668,128 @@ func checksInSync(state *local.State, wantChecks int) error { } return nil } + +func TestStateProxyManagement(t *testing.T) { + t.Parallel() + + state := local.NewState(local.Config{ + ProxyPortRangeStart: 20000, + ProxyPortRangeEnd: 20002, + }, log.New(os.Stderr, "", log.LstdFlags), &token.Store{}) + + // Stub state syncing + state.TriggerSyncChanges = func() {} + + p1 := structs.ConnectManagedProxy{ + ExecMode: structs.ProxyExecModeDaemon, + Command: "consul connect proxy", + TargetServiceID: "web", + } + + require := require.New(t) + assert := assert.New(t) + + _, err := state.AddProxy(&p1, "fake-token") + require.Error(err, "should fail as the target service isn't registered") + + // Sanity check done, lets add a couple of target services to the state + err = state.AddService(&structs.NodeService{ + Service: "web", + }, "fake-token-web") + require.NoError(err) + err = state.AddService(&structs.NodeService{ + Service: "cache", + }, "fake-token-cache") + require.NoError(err) + require.NoError(err) + err = state.AddService(&structs.NodeService{ + Service: "db", + }, "fake-token-db") + require.NoError(err) + + // Should work now + svc, err := state.AddProxy(&p1, "fake-token") + require.NoError(err) + + assert.Equal("web-proxy", svc.ID) + assert.Equal("web-proxy", svc.Service) + assert.Equal(structs.ServiceKindConnectProxy, svc.Kind) + assert.Equal("web", svc.ProxyDestination) + assert.Equal("", svc.Address, "should have empty address by default") + // Port is non-deterministic but could be either of 20000 or 20001 + assert.Contains([]int{20000, 20001}, svc.Port) + + // Second proxy should claim other port + p2 := p1 + p2.TargetServiceID = "cache" + svc2, err := state.AddProxy(&p2, "fake-token") + require.NoError(err) + assert.Contains([]int{20000, 20001}, svc2.Port) + assert.NotEqual(svc.Port, svc2.Port) + + // Just saving this for later... + p2Token, err := state.ProxyToken(svc2.ID) + require.NoError(err) + + // Third proxy should fail as all ports are used + p3 := p1 + p3.TargetServiceID = "db" + _, err = state.AddProxy(&p3, "fake-token") + require.Error(err) + + // But if we set a port explicitly it should be OK + p3.Config = map[string]interface{}{ + "bind_port": 1234, + "bind_address": "0.0.0.0", + } + svc3, err := state.AddProxy(&p3, "fake-token") + require.NoError(err) + require.Equal("0.0.0.0", svc3.Address) + require.Equal(1234, svc3.Port) + + // Remove one of the auto-assigned proxies + err = state.RemoveProxy(svc2.ID) + require.NoError(err) + + // Should be able to create a new proxy for that service with the port (it + // should have been "freed"). + p4 := p2 + svc4, err := state.AddProxy(&p4, "fake-token") + require.NoError(err) + assert.Contains([]int{20000, 20001}, svc2.Port) + assert.Equal(svc4.Port, svc2.Port, "should get the same port back that we freed") + + // Remove a proxy that doesn't exist should error + err = state.RemoveProxy("nope") + require.Error(err) + + assert.Equal(&p4, state.Proxy(p4.ProxyService.ID), + "should fetch the right proxy details") + assert.Nil(state.Proxy("nope")) + + proxies := state.Proxies() + assert.Len(proxies, 3) + assert.Equal(&p1, proxies[svc.ID]) + assert.Equal(&p4, proxies[svc4.ID]) + assert.Equal(&p3, proxies[svc3.ID]) + + tokens := make([]string, 4) + tokens[0], err = state.ProxyToken(svc.ID) + require.NoError(err) + // p2 not registered anymore but lets make sure p4 got a new token when it + // re-registered with same ID. + tokens[1] = p2Token + tokens[2], err = state.ProxyToken(svc3.ID) + require.NoError(err) + tokens[3], err = state.ProxyToken(svc4.ID) + require.NoError(err) + + // Quick check all are distinct + for i := 0; i < len(tokens)-1; i++ { + assert.Len(tokens[i], 36) // Sanity check for UUIDish thing. + for j := i + 1; j < len(tokens); j++ { + assert.NotEqual(tokens[i], tokens[j], "tokens for proxy %d and %d match", + i+1, j+1) + } + } +} diff --git a/agent/structs/connect.go b/agent/structs/connect.go index 7f08615d39..6f11c5fe3f 100644 --- a/agent/structs/connect.go +++ b/agent/structs/connect.go @@ -1,5 +1,9 @@ package structs +import ( + "github.com/mitchellh/mapstructure" +) + // ConnectAuthorizeRequest is the structure of a request to authorize // a connection. type ConnectAuthorizeRequest struct { @@ -15,3 +19,75 @@ type ConnectAuthorizeRequest struct { ClientCertURI string ClientCertSerial string } + +// ProxyExecMode encodes the mode for running a managed connect proxy. +type ProxyExecMode int + +const ( + // ProxyExecModeDaemon executes a proxy process as a supervised daemon. + ProxyExecModeDaemon ProxyExecMode = iota + + // ProxyExecModeScript executes a proxy config script on each change to it's + // config. + ProxyExecModeScript +) + +// ConnectManagedProxy represents the agent-local state for a configured proxy +// instance. This is never stored or sent to the servers and is only used to +// store the config for the proxy that the agent needs to track. For now it's +// really generic with only the fields the agent needs to act on defined while +// the rest of the proxy config is passed as opaque bag of attributes to support +// arbitrary config params for third-party proxy integrations. "External" +// proxies by definition register themselves and manage their own config +// externally so are never represented in agent state. +type ConnectManagedProxy struct { + // ExecMode is one of daemon or script. + ExecMode ProxyExecMode + + // Command is the command to execute. Empty defaults to self-invoking the same + // consul binary with proxy subcomand for ProxyExecModeDaemon and is an error + // for ProxyExecModeScript. + Command string + + // Config is the arbitrary configuration data provided with the registration. + Config map[string]interface{} + + // ProxyService is a pointer to the local proxy's service record for + // convenience. The proxies ID and name etc. can be read from there. It may be + // nil if the agent is starting up and hasn't registered the service yet. + ProxyService *NodeService + + // TargetServiceID is the ID of the target service on the localhost. It may + // not exist yet since bootstrapping is allowed to happen in either order. + TargetServiceID string +} + +// ConnectManagedProxyConfig represents the parts of the proxy config the agent +// needs to understand. It's bad UX to make the user specify these separately +// just to make parsing simpler for us so this encapsulates the fields in +// ConnectManagedProxy.Config that we care about. They are all optoinal anyway +// and this is used to decode them with mapstructure. +type ConnectManagedProxyConfig struct { + BindAddress string `mapstructure:"bind_address"` + BindPort int `mapstructure:"bind_port"` +} + +// ParseConfig attempts to read the fields we care about from the otherwise +// opaque config map. They are all optional but it may fail if one is specified +// but an invalid value. +func (p *ConnectManagedProxy) ParseConfig() (*ConnectManagedProxyConfig, error) { + var cfg ConnectManagedProxyConfig + d, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + ErrorUnused: false, + WeaklyTypedInput: true, // allow string port etc. + Result: &cfg, + }) + if err != nil { + return nil, err + } + err = d.Decode(p.Config) + if err != nil { + return nil, err + } + return &cfg, nil +} diff --git a/agent/structs/connect_test.go b/agent/structs/connect_test.go new file mode 100644 index 0000000000..905ae09ef7 --- /dev/null +++ b/agent/structs/connect_test.go @@ -0,0 +1,115 @@ +package structs + +import ( + "reflect" + "testing" +) + +func TestConnectManagedProxy_ParseConfig(t *testing.T) { + tests := []struct { + name string + config map[string]interface{} + want *ConnectManagedProxyConfig + wantErr bool + }{ + { + name: "empty", + config: nil, + want: &ConnectManagedProxyConfig{}, + wantErr: false, + }, + { + name: "specified", + config: map[string]interface{}{ + "bind_address": "127.0.0.1", + "bind_port": 1234, + }, + want: &ConnectManagedProxyConfig{ + BindAddress: "127.0.0.1", + BindPort: 1234, + }, + wantErr: false, + }, + { + name: "stringy port", + config: map[string]interface{}{ + "bind_address": "127.0.0.1", + "bind_port": "1234", + }, + want: &ConnectManagedProxyConfig{ + BindAddress: "127.0.0.1", + BindPort: 1234, + }, + wantErr: false, + }, + { + name: "empty addr", + config: map[string]interface{}{ + "bind_address": "", + "bind_port": "1234", + }, + want: &ConnectManagedProxyConfig{ + BindAddress: "", + BindPort: 1234, + }, + wantErr: false, + }, + { + name: "empty port", + config: map[string]interface{}{ + "bind_address": "127.0.0.1", + "bind_port": "", + }, + want: nil, + wantErr: true, + }, + { + name: "junk address", + config: map[string]interface{}{ + "bind_address": 42, + "bind_port": "", + }, + want: nil, + wantErr: true, + }, + { + name: "zero port, missing addr", + config: map[string]interface{}{ + "bind_port": 0, + }, + want: &ConnectManagedProxyConfig{ + BindPort: 0, + }, + wantErr: false, + }, + { + name: "extra fields present", + config: map[string]interface{}{ + "bind_port": 1234, + "flamingos": true, + "upstream": []map[string]interface{}{ + {"foo": "bar"}, + }, + }, + want: &ConnectManagedProxyConfig{ + BindPort: 1234, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &ConnectManagedProxy{ + Config: tt.config, + } + got, err := p.ParseConfig() + if (err != nil) != tt.wantErr { + t.Errorf("ConnectManagedProxy.ParseConfig() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ConnectManagedProxy.ParseConfig() = %v, want %v", got, tt.want) + } + }) + } +}