Browse Source

Working proxy config reload tests

pull/4275/head
Paul Banks 7 years ago committed by Mitchell Hashimoto
parent
commit
ab3df3d4a6
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
  1. 133
      connect/proxy/config.go
  2. 125
      connect/proxy/config_test.go
  3. 3
      connect/proxy/testdata/config-kitchensink.hcl
  4. 0
      key.pem
  5. 3
      watch/funcs.go

133
connect/proxy/config.go

@ -5,8 +5,11 @@ import (
"io/ioutil"
"log"
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/connect"
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/hcl"
)
@ -59,21 +62,23 @@ type Config struct {
// PublicListenerConfig contains the parameters needed for the incoming mTLS
// listener.
type PublicListenerConfig struct {
// BindAddress is the host:port the public mTLS listener will bind to.
BindAddress string `json:"bind_address" hcl:"bind_address"`
// BindAddress is the host/IP the public mTLS listener will bind to.
BindAddress string `json:"bind_address" hcl:"bind_address" mapstructure:"bind_address"`
BindPort string `json:"bind_port" hcl:"bind_port" mapstructure:"bind_port"`
// LocalServiceAddress is the host:port for the proxied application. This
// should be on loopback or otherwise protected as it's plain TCP.
LocalServiceAddress string `json:"local_service_address" hcl:"local_service_address"`
LocalServiceAddress string `json:"local_service_address" hcl:"local_service_address" mapstructure:"local_service_address"`
// LocalConnectTimeout is the timeout for establishing connections with the
// local backend. Defaults to 1000 (1s).
LocalConnectTimeoutMs int `json:"local_connect_timeout_ms" hcl:"local_connect_timeout_ms"`
LocalConnectTimeoutMs int `json:"local_connect_timeout_ms" hcl:"local_connect_timeout_ms" mapstructure:"local_connect_timeout_ms"`
// HandshakeTimeout is the timeout for incoming mTLS clients to complete a
// handshake. Setting this low avoids DOS by malicious clients holding
// resources open. Defaults to 10000 (10s).
HandshakeTimeoutMs int `json:"handshake_timeout_ms" hcl:"handshake_timeout_ms"`
HandshakeTimeoutMs int `json:"handshake_timeout_ms" hcl:"handshake_timeout_ms" mapstructure:"handshake_timeout_ms"`
}
// applyDefaults sets zero-valued params to a sane default.
@ -88,26 +93,28 @@ func (plc *PublicListenerConfig) applyDefaults() {
// UpstreamConfig configures an upstream (outgoing) listener.
type UpstreamConfig struct {
// LocalAddress is the host:port to listen on for local app connections.
LocalBindAddress string `json:"local_bind_address" hcl:"local_bind_address,attr"`
// LocalAddress is the host/ip to listen on for local app connections. Defaults to 127.0.0.1.
LocalBindAddress string `json:"local_bind_address" hcl:"local_bind_address,attr" mapstructure:"local_bind_address"`
LocalBindPort int `json:"local_bind_port" hcl:"local_bind_port,attr" mapstructure:"local_bind_port"`
// DestinationName is the service name of the destination.
DestinationName string `json:"destination_name" hcl:"destination_name,attr"`
DestinationName string `json:"destination_name" hcl:"destination_name,attr" mapstructure:"destination_name"`
// DestinationNamespace is the namespace of the destination.
DestinationNamespace string `json:"destination_namespace" hcl:"destination_namespace,attr"`
DestinationNamespace string `json:"destination_namespace" hcl:"destination_namespace,attr" mapstructure:"destination_namespace"`
// DestinationType determines which service discovery method is used to find a
// candidate instance to connect to.
DestinationType string `json:"destination_type" hcl:"destination_type,attr"`
DestinationType string `json:"destination_type" hcl:"destination_type,attr" mapstructure:"destination_type"`
// DestinationDatacenter is the datacenter the destination is in. If empty,
// defaults to discovery within the same datacenter.
DestinationDatacenter string `json:"destination_datacenter" hcl:"destination_datacenter,attr"`
DestinationDatacenter string `json:"destination_datacenter" hcl:"destination_datacenter,attr" mapstructure:"destination_datacenter"`
// ConnectTimeout is the timeout for establishing connections with the remote
// service instance. Defaults to 10,000 (10s).
ConnectTimeoutMs int `json:"connect_timeout_ms" hcl:"connect_timeout_ms,attr"`
ConnectTimeoutMs int `json:"connect_timeout_ms" hcl:"connect_timeout_ms,attr" mapstructure:"connect_timeout_ms"`
// resolver is used to plug in the service discover mechanism. It can be used
// in tests to bypass discovery. In real usage it is used to inject the
@ -121,13 +128,22 @@ func (uc *UpstreamConfig) applyDefaults() {
if uc.ConnectTimeoutMs == 0 {
uc.ConnectTimeoutMs = 10000
}
if uc.DestinationType == "" {
uc.DestinationType = "service"
}
if uc.DestinationNamespace == "" {
uc.DestinationNamespace = "default"
}
if uc.LocalBindAddress == "" {
uc.LocalBindAddress = "127.0.0.1"
}
}
// String returns a string that uniquely identifies the Upstream. Used for
// identifying the upstream in log output and map keys.
func (uc *UpstreamConfig) String() string {
return fmt.Sprintf("%s->%s:%s/%s", uc.LocalBindAddress, uc.DestinationType,
uc.DestinationNamespace, uc.DestinationName)
return fmt.Sprintf("%s:%d->%s:%s/%s", uc.LocalBindAddress, uc.LocalBindPort,
uc.DestinationType, uc.DestinationNamespace, uc.DestinationName)
}
// UpstreamResolverFromClient returns a ConsulResolver that can resolve the
@ -212,12 +228,93 @@ type AgentConfigWatcher struct {
client *api.Client
proxyID string
logger *log.Logger
ch chan *Config
plan *watch.Plan
}
// NewAgentConfigWatcher creates an AgentConfigWatcher.
func NewAgentConfigWatcher(client *api.Client, proxyID string,
logger *log.Logger) (*AgentConfigWatcher, error) {
w := &AgentConfigWatcher{
client: client,
proxyID: proxyID,
logger: logger,
ch: make(chan *Config),
}
// Setup watch plan for config
plan, err := watch.Parse(map[string]interface{}{
"type": "connect_proxy_config",
"proxy_service_id": w.proxyID,
})
if err != nil {
return nil, err
}
w.plan = plan
w.plan.Handler = w.handler
go w.plan.RunWithClientAndLogger(w.client, w.logger)
return w, nil
}
func (w *AgentConfigWatcher) handler(blockVal watch.BlockingParam,
val interface{}) {
log.Printf("DEBUG: got hash %s", blockVal.(watch.WaitHashVal))
resp, ok := val.(*api.ConnectProxyConfig)
if !ok {
w.logger.Printf("[WARN] proxy config watch returned bad response: %v", val)
return
}
// Setup Service instance now we know target ID etc
service, err := connect.NewService(resp.TargetServiceID, w.client)
if err != nil {
w.logger.Printf("[WARN] proxy config watch failed to initialize"+
" service: %s", err)
return
}
// Create proxy config from the response
cfg := &Config{
ProxyID: w.proxyID,
// Token should be already setup in the client
ProxiedServiceID: resp.TargetServiceID,
ProxiedServiceNamespace: "default",
service: service,
}
// Unmarshal configs
err = mapstructure.Decode(resp.Config, &cfg.PublicListener)
if err != nil {
w.logger.Printf("[ERR] proxy config watch public listener config "+
"couldn't be parsed: %s", err)
return
}
cfg.PublicListener.applyDefaults()
err = mapstructure.Decode(resp.Config["upstreams"], &cfg.Upstreams)
if err != nil {
w.logger.Printf("[ERR] proxy config watch upstream listener config "+
"couldn't be parsed: %s", err)
return
}
for i := range cfg.Upstreams {
cfg.Upstreams[i].applyDefaults()
}
// Parsed config OK, deliver it!
w.ch <- cfg
}
// Watch implements ConfigWatcher.
func (w *AgentConfigWatcher) Watch() <-chan *Config {
watch := make(chan *Config)
// TODO implement me, note we need to discover the Service instance to use and
// set it on the Config we return.
return watch
return w.ch
}
// Close frees watcher resources and implements io.Closer
func (w *AgentConfigWatcher) Close() error {
if w.plan != nil {
w.plan.Stop()
}
return nil
}

125
connect/proxy/config_test.go

@ -1,8 +1,15 @@
package proxy
import (
"log"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/connect"
"github.com/stretchr/testify/require"
)
@ -106,3 +113,121 @@ func TestUpstreamResolverFromClient(t *testing.T) {
})
}
}
func TestAgentConfigWatcher(t *testing.T) {
a := agent.NewTestAgent("agent_smith", "")
client := a.Client()
agent := client.Agent()
// Register a service with a proxy
// Register a local agent service with a managed proxy
reg := &api.AgentServiceRegistration{
Name: "web",
Port: 8080,
Connect: &api.AgentServiceConnect{
Proxy: &api.AgentServiceConnectProxy{
Config: map[string]interface{}{
"bind_address": "10.10.10.10",
"bind_port": "1010",
"local_service_address": "127.0.0.1:5000",
"handshake_timeout_ms": 999,
"upstreams": []interface{}{
map[string]interface{}{
"destination_name": "db",
"local_bind_port": 9191,
},
},
},
},
},
}
err := agent.ServiceRegister(reg)
require.NoError(t, err)
w, err := NewAgentConfigWatcher(client, "web-proxy",
log.New(os.Stderr, "", log.LstdFlags))
require.NoError(t, err)
cfg := testGetConfigValTimeout(t, w, 500*time.Millisecond)
expectCfg := &Config{
ProxyID: w.proxyID,
ProxiedServiceID: "web",
ProxiedServiceNamespace: "default",
PublicListener: PublicListenerConfig{
BindAddress: "10.10.10.10",
BindPort: "1010",
LocalServiceAddress: "127.0.0.1:5000",
HandshakeTimeoutMs: 999,
LocalConnectTimeoutMs: 1000, // from applyDefaults
},
Upstreams: []UpstreamConfig{
{
DestinationName: "db",
DestinationNamespace: "default",
DestinationType: "service",
LocalBindPort: 9191,
LocalBindAddress: "127.0.0.1",
ConnectTimeoutMs: 10000, // from applyDefaults
},
},
}
// nil this out as comparisons are problematic, we'll explicitly sanity check
// it's reasonable later.
assert.NotNil(t, cfg.service)
cfg.service = nil
assert.Equal(t, expectCfg, cfg)
// TODO(banks): Sanity check the service is viable and gets TLS certs eventually from
// the agent.
// Now keep watching and update the config.
go func() {
// Wait for watcher to be watching
time.Sleep(20 * time.Millisecond)
upstreams := reg.Connect.Proxy.Config["upstreams"].([]interface{})
upstreams = append(upstreams, map[string]interface{}{
"destination_name": "cache",
"local_bind_port": 9292,
"local_bind_address": "127.10.10.10",
})
reg.Connect.Proxy.Config["upstreams"] = upstreams
reg.Connect.Proxy.Config["local_connect_timeout_ms"] = 444
err := agent.ServiceRegister(reg)
require.NoError(t, err)
}()
cfg = testGetConfigValTimeout(t, w, 2*time.Second)
expectCfg.Upstreams = append(expectCfg.Upstreams, UpstreamConfig{
DestinationName: "cache",
DestinationNamespace: "default",
DestinationType: "service",
ConnectTimeoutMs: 10000, // from applyDefaults
LocalBindPort: 9292,
LocalBindAddress: "127.10.10.10",
})
expectCfg.PublicListener.LocalConnectTimeoutMs = 444
// nil this out as comparisons are problematic, we'll explicitly sanity check
// it's reasonable later.
assert.NotNil(t, cfg.service)
cfg.service = nil
assert.Equal(t, expectCfg, cfg)
}
func testGetConfigValTimeout(t *testing.T, w ConfigWatcher,
timeout time.Duration) *Config {
t.Helper()
select {
case cfg := <-w.Watch():
return cfg
case <-time.After(timeout):
t.Fatalf("timeout after %s waiting for config update", timeout)
return nil
}
}

3
connect/proxy/testdata/config-kitchensink.hcl vendored

@ -12,7 +12,8 @@ dev_service_cert_file = "connect/testdata/ca1-svc-web.cert.pem"
dev_service_key_file = "connect/testdata/ca1-svc-web.key.pem"
public_listener {
bind_address = ":9999"
bind_address = "127.0.0.1"
bind_port= "9999"
local_service_address = "127.0.0.1:5000"
}

3
watch/funcs.go

@ -3,7 +3,6 @@ package watch
import (
"context"
"fmt"
"log"
consulapi "github.com/hashicorp/consul/api"
)
@ -297,8 +296,6 @@ func connectProxyConfigWatch(params map[string]interface{}) (WatcherFunc, error)
opts := makeQueryOptionsWithContext(p, false)
defer p.cancelFunc()
log.Printf("DEBUG: id: %s, opts: %v", proxyServiceID, opts)
config, _, err := agent.ConnectProxyConfig(proxyServiceID, &opts)
if err != nil {
return nil, nil, err

Loading…
Cancel
Save