diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index b13e6d076e..fde7ca5e2b 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -28,7 +28,6 @@ import ( "github.com/hashicorp/serf/serf" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - // NOTE(mitcehllh): This is temporary while certs are stubbed out. ) type Self struct { @@ -1000,14 +999,71 @@ func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http } contentHash := fmt.Sprintf("%x", hash) + // Merge globals defaults + config := make(map[string]interface{}) + for k, v := range s.agent.config.ConnectProxyDefaultConfig { + if _, ok := config[k]; !ok { + config[k] = v + } + } + + execMode := "daemon" + // If there is a global default mode use that instead + if s.agent.config.ConnectProxyDefaultExecMode != "" { + execMode = s.agent.config.ConnectProxyDefaultExecMode + } + // If it's actually set though, use the one set + if proxy.Proxy.ExecMode != structs.ProxyExecModeUnspecified { + execMode = proxy.Proxy.ExecMode.String() + } + + // TODO(banks): default the binary to current binary. Probably needs to be + // done deeper though as it will be needed for actually managing proxy + // lifecycle. + command := proxy.Proxy.Command + if command == "" { + if execMode == "daemon" { + command = s.agent.config.ConnectProxyDefaultDaemonCommand + } + if execMode == "script" { + command = s.agent.config.ConnectProxyDefaultScriptCommand + } + } + // No global defaults set either... + if command == "" { + command = "consul connect proxy" + } + + // Set defaults for anything that is still not specified but required. + // Note that these are not included in the content hash. Since we expect + // them to be static in general but some like the default target service + // port might not be. In that edge case services can set that explicitly + // when they re-register which will be caught though. + for k, v := range proxy.Proxy.Config { + config[k] = v + } + if _, ok := config["bind_port"]; !ok { + config["bind_port"] = proxy.Proxy.ProxyService.Port + } + if _, ok := config["bind_address"]; !ok { + // Default to binding to the same address the agent is configured to + // bind to. + config["bind_address"] = s.agent.config.BindAddr.String() + } + if _, ok := config["local_service_address"]; !ok { + // Default to localhost and the port the service registered with + config["local_service_address"] = fmt.Sprintf("127.0.0.1:%d", + target.Port) + } + reply := &api.ConnectProxyConfig{ ProxyServiceID: proxy.Proxy.ProxyService.ID, TargetServiceID: target.ID, TargetServiceName: target.Service, ContentHash: contentHash, - ExecMode: api.ProxyExecMode(proxy.Proxy.ExecMode.String()), - Command: proxy.Proxy.Command, - Config: proxy.Proxy.Config, + ExecMode: api.ProxyExecMode(execMode), + Command: command, + Config: config, } return contentHash, reply, nil }) @@ -1040,10 +1096,13 @@ func (s *HTTPServer) agentLocalBlockingQuery(resp http.ResponseWriter, hash stri // Apply a small amount of jitter to the request. wait += lib.RandomStagger(wait / 16) timeout = time.NewTimer(wait) - ws = memdb.NewWatchSet() } for { + // Must reset this every loop in case the Watch set is already closed but + // hash remains same. In that case we'll need to re-block on ws.Watch() + // again. + ws = memdb.NewWatchSet() curHash, curResp, err := fn(ws) if err != nil { return curResp, err diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index fad92cb9a8..be97bf5a4c 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -2316,7 +2316,7 @@ func requireLeafValidUnderCA(t *testing.T, issued *structs.IssuedCert, require.NoError(t, err) } -func TestAgentConnectProxy(t *testing.T) { +func TestAgentConnectProxyConfig_Blocking(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") @@ -2354,7 +2354,7 @@ func TestAgentConnectProxy(t *testing.T) { TargetServiceName: "test", ContentHash: "84346af2031659c9", ExecMode: "daemon", - Command: "", + Command: "consul connect proxy", Config: map[string]interface{}{ "upstreams": []interface{}{ map[string]interface{}{ @@ -2362,15 +2362,17 @@ func TestAgentConnectProxy(t *testing.T) { "local_port": float64(3131), }, }, - "bind_port": float64(1234), - "connect_timeout_ms": float64(500), + "bind_address": "127.0.0.1", + "local_service_address": "127.0.0.1:8000", + "bind_port": float64(1234), + "connect_timeout_ms": float64(500), }, } ur, err := copystructure.Copy(expectedResponse) require.NoError(t, err) updatedResponse := ur.(*api.ConnectProxyConfig) - updatedResponse.ContentHash = "7d53473b0e9db5a" + updatedResponse.ContentHash = "e1e3395f0d00cd41" upstreams := updatedResponse.Config["upstreams"].([]interface{}) upstreams = append(upstreams, map[string]interface{}{ @@ -2431,6 +2433,41 @@ func TestAgentConnectProxy(t *testing.T) { wantErr: false, wantResp: updatedResponse, }, + { + // This test exercises a case that caused a busy loop to eat CPU for the + // entire duration of the blocking query. If a service gets re-registered + // wth same proxy config then the old proxy config chan is closed causing + // blocked watchset.Watch to return false indicating a change. But since + // the hash is the same when the blocking fn is re-called we should just + // keep blocking on the next iteration. The bug hit was that the WatchSet + // ws was not being reset in the loop and so when you try to `Watch` it + // the second time it just returns immediately making the blocking loop + // into a busy-poll! + // + // This test though doesn't catch that because busy poll still has the + // correct external behaviour. I don't want to instrument the loop to + // assert it's not executing too fast here as I can't think of a clean way + // and the issue is fixed now so this test doesn't actually catch the + // error, but does provide an easy way to verify the behaviour by hand: + // 1. Make this test fail e.g. change wantErr to true + // 2. Add a log.Println or similar into the blocking loop/function + // 3. See whether it's called just once or many times in a tight loop. + name: "blocking fetch interrupted with no change (same hash)", + url: "/v1/agent/connect/proxy/test-proxy?wait=200ms&hash=" + expectedResponse.ContentHash, + updateFunc: func() { + time.Sleep(100 * time.Millisecond) + // Re-register with _same_ proxy config + req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(reg)) + resp := httptest.NewRecorder() + _, err = a.srv.AgentRegisterService(resp, req) + require.NoError(t, err) + require.Equal(t, 200, resp.Code, "body: %s", resp.Body.String()) + }, + wantWait: 200 * time.Millisecond, + wantCode: 200, + wantErr: false, + wantResp: expectedResponse, + }, } for _, tt := range tests { @@ -2479,6 +2516,201 @@ func TestAgentConnectProxy(t *testing.T) { } } +func TestAgentConnectProxyConfig_ConfigHandling(t *testing.T) { + t.Parallel() + + // Define a local service with a managed proxy. It's registered in the test + // loop to make sure agent state is predictable whatever order tests execute + // since some alter this service config. + reg := &structs.ServiceDefinition{ + ID: "test-id", + Name: "test", + Address: "127.0.0.1", + Port: 8000, + Check: structs.CheckType{ + TTL: 15 * time.Second, + }, + Connect: &structs.ServiceDefinitionConnect{}, + } + + tests := []struct { + name string + globalConfig string + proxy structs.ServiceDefinitionConnectProxy + wantMode api.ProxyExecMode + wantCommand string + wantConfig map[string]interface{} + }{ + { + name: "defaults", + globalConfig: ` + bind_addr = "0.0.0.0" + connect { + enabled = true + proxy_defaults = { + bind_min_port = 10000 + bind_max_port = 10000 + } + } + `, + proxy: structs.ServiceDefinitionConnectProxy{}, + wantMode: api.ProxyExecModeDaemon, + wantCommand: "consul connect proxy", + wantConfig: map[string]interface{}{ + "bind_address": "0.0.0.0", + "bind_port": 10000, // "randomly" chosen from our range of 1 + "local_service_address": "127.0.0.1:8000", // port from service reg + }, + }, + { + name: "global defaults - script", + globalConfig: ` + bind_addr = "0.0.0.0" + connect { + enabled = true + proxy_defaults = { + bind_min_port = 10000 + bind_max_port = 10000 + exec_mode = "script" + script_command = "script.sh" + } + } + `, + proxy: structs.ServiceDefinitionConnectProxy{}, + wantMode: api.ProxyExecModeScript, + wantCommand: "script.sh", + wantConfig: map[string]interface{}{ + "bind_address": "0.0.0.0", + "bind_port": 10000, // "randomly" chosen from our range of 1 + "local_service_address": "127.0.0.1:8000", // port from service reg + }, + }, + { + name: "global defaults - daemon", + globalConfig: ` + bind_addr = "0.0.0.0" + connect { + enabled = true + proxy_defaults = { + bind_min_port = 10000 + bind_max_port = 10000 + exec_mode = "daemon" + daemon_command = "daemon.sh" + } + } + `, + proxy: structs.ServiceDefinitionConnectProxy{}, + wantMode: api.ProxyExecModeDaemon, + wantCommand: "daemon.sh", + wantConfig: map[string]interface{}{ + "bind_address": "0.0.0.0", + "bind_port": 10000, // "randomly" chosen from our range of 1 + "local_service_address": "127.0.0.1:8000", // port from service reg + }, + }, + { + name: "global default config merge", + globalConfig: ` + bind_addr = "0.0.0.0" + connect { + enabled = true + proxy_defaults = { + bind_min_port = 10000 + bind_max_port = 10000 + config = { + connect_timeout_ms = 1000 + } + } + } + `, + proxy: structs.ServiceDefinitionConnectProxy{ + Config: map[string]interface{}{ + "foo": "bar", + }, + }, + wantMode: api.ProxyExecModeDaemon, + wantCommand: "consul connect proxy", + wantConfig: map[string]interface{}{ + "bind_address": "0.0.0.0", + "bind_port": 10000, // "randomly" chosen from our range of 1 + "local_service_address": "127.0.0.1:8000", // port from service reg + "connect_timeout_ms": 1000, + "foo": "bar", + }, + }, + { + name: "overrides in reg", + globalConfig: ` + bind_addr = "0.0.0.0" + connect { + enabled = true + proxy_defaults = { + bind_min_port = 10000 + bind_max_port = 10000 + exec_mode = "daemon" + daemon_command = "daemon.sh" + script_command = "script.sh" + config = { + connect_timeout_ms = 1000 + } + } + } + `, + proxy: structs.ServiceDefinitionConnectProxy{ + ExecMode: "script", + Command: "foo.sh", + Config: map[string]interface{}{ + "connect_timeout_ms": 2000, + "bind_address": "127.0.0.1", + "bind_port": 1024, + "local_service_address": "127.0.0.1:9191", + }, + }, + wantMode: api.ProxyExecModeScript, + wantCommand: "foo.sh", + wantConfig: map[string]interface{}{ + "bind_address": "127.0.0.1", + "bind_port": float64(1024), + "local_service_address": "127.0.0.1:9191", + "connect_timeout_ms": float64(2000), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + a := NewTestAgent(t.Name(), tt.globalConfig) + defer a.Shutdown() + + // Register the basic service with the required config + { + reg.Connect.Proxy = &tt.proxy + req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(reg)) + resp := httptest.NewRecorder() + _, err := a.srv.AgentRegisterService(resp, req) + require.NoError(err) + require.Equal(200, resp.Code, "body: %s", resp.Body.String()) + } + + req, _ := http.NewRequest("GET", "/v1/agent/connect/proxy/test-id-proxy", nil) + resp := httptest.NewRecorder() + obj, err := a.srv.AgentConnectProxyConfig(resp, req) + require.NoError(err) + + proxyCfg := obj.(*api.ConnectProxyConfig) + assert.Equal("test-id-proxy", proxyCfg.ProxyServiceID) + assert.Equal("test-id", proxyCfg.TargetServiceID) + assert.Equal("test", proxyCfg.TargetServiceName) + assert.Equal(tt.wantMode, proxyCfg.ExecMode) + assert.Equal(tt.wantCommand, proxyCfg.Command) + require.Equal(tt.wantConfig, proxyCfg.Config) + }) + } +} + func TestAgentConnectAuthorize_badBody(t *testing.T) { t.Parallel() diff --git a/agent/config/builder.go b/agent/config/builder.go index 6ad6c70b54..3d9818adc8 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -531,6 +531,17 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { connectCAConfig = c.Connect.CAConfig } + proxyDefaultExecMode := "" + proxyDefaultDaemonCommand := "" + proxyDefaultScriptCommand := "" + proxyDefaultConfig := make(map[string]interface{}) + if c.Connect != nil && c.Connect.ProxyDefaults != nil { + proxyDefaultExecMode = b.stringVal(c.Connect.ProxyDefaults.ExecMode) + proxyDefaultDaemonCommand = b.stringVal(c.Connect.ProxyDefaults.DaemonCommand) + proxyDefaultScriptCommand = b.stringVal(c.Connect.ProxyDefaults.ScriptCommand) + proxyDefaultConfig = c.Connect.ProxyDefaults.Config + } + // ---------------------------------------------------------------- // build runtime config // @@ -638,100 +649,104 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { TelemetryStatsiteAddr: b.stringVal(c.Telemetry.StatsiteAddr), // Agent - AdvertiseAddrLAN: advertiseAddrLAN, - AdvertiseAddrWAN: advertiseAddrWAN, - BindAddr: bindAddr, - Bootstrap: b.boolVal(c.Bootstrap), - BootstrapExpect: b.intVal(c.BootstrapExpect), - CAFile: b.stringVal(c.CAFile), - CAPath: b.stringVal(c.CAPath), - CertFile: b.stringVal(c.CertFile), - CheckUpdateInterval: b.durationVal("check_update_interval", c.CheckUpdateInterval), - Checks: checks, - ClientAddrs: clientAddrs, - ConnectEnabled: connectEnabled, - ConnectProxyBindMinPort: proxyBindMinPort, - ConnectProxyBindMaxPort: proxyBindMaxPort, - ConnectCAProvider: connectCAProvider, - ConnectCAConfig: connectCAConfig, - DataDir: b.stringVal(c.DataDir), - Datacenter: strings.ToLower(b.stringVal(c.Datacenter)), - DevMode: b.boolVal(b.Flags.DevMode), - DisableAnonymousSignature: b.boolVal(c.DisableAnonymousSignature), - DisableCoordinates: b.boolVal(c.DisableCoordinates), - DisableHostNodeID: b.boolVal(c.DisableHostNodeID), - DisableKeyringFile: b.boolVal(c.DisableKeyringFile), - DisableRemoteExec: b.boolVal(c.DisableRemoteExec), - DisableUpdateCheck: b.boolVal(c.DisableUpdateCheck), - DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput), - DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale), - EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks), - EnableDebug: b.boolVal(c.EnableDebug), - EnableScriptChecks: b.boolVal(c.EnableScriptChecks), - EnableSyslog: b.boolVal(c.EnableSyslog), - EnableUI: b.boolVal(c.UI), - EncryptKey: b.stringVal(c.EncryptKey), - EncryptVerifyIncoming: b.boolVal(c.EncryptVerifyIncoming), - EncryptVerifyOutgoing: b.boolVal(c.EncryptVerifyOutgoing), - KeyFile: b.stringVal(c.KeyFile), - LeaveDrainTime: b.durationVal("performance.leave_drain_time", c.Performance.LeaveDrainTime), - LeaveOnTerm: leaveOnTerm, - LogLevel: b.stringVal(c.LogLevel), - NodeID: types.NodeID(b.stringVal(c.NodeID)), - NodeMeta: c.NodeMeta, - NodeName: b.nodeName(c.NodeName), - NonVotingServer: b.boolVal(c.NonVotingServer), - PidFile: b.stringVal(c.PidFile), - RPCAdvertiseAddr: rpcAdvertiseAddr, - RPCBindAddr: rpcBindAddr, - RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout), - RPCMaxBurst: b.intVal(c.Limits.RPCMaxBurst), - RPCProtocol: b.intVal(c.RPCProtocol), - RPCRateLimit: rate.Limit(b.float64Val(c.Limits.RPCRate)), - RaftProtocol: b.intVal(c.RaftProtocol), - RaftSnapshotThreshold: b.intVal(c.RaftSnapshotThreshold), - RaftSnapshotInterval: b.durationVal("raft_snapshot_interval", c.RaftSnapshotInterval), - ReconnectTimeoutLAN: b.durationVal("reconnect_timeout", c.ReconnectTimeoutLAN), - ReconnectTimeoutWAN: b.durationVal("reconnect_timeout_wan", c.ReconnectTimeoutWAN), - RejoinAfterLeave: b.boolVal(c.RejoinAfterLeave), - RetryJoinIntervalLAN: b.durationVal("retry_interval", c.RetryJoinIntervalLAN), - RetryJoinIntervalWAN: b.durationVal("retry_interval_wan", c.RetryJoinIntervalWAN), - RetryJoinLAN: b.expandAllOptionalAddrs("retry_join", c.RetryJoinLAN), - RetryJoinMaxAttemptsLAN: b.intVal(c.RetryJoinMaxAttemptsLAN), - RetryJoinMaxAttemptsWAN: b.intVal(c.RetryJoinMaxAttemptsWAN), - RetryJoinWAN: b.expandAllOptionalAddrs("retry_join_wan", c.RetryJoinWAN), - SegmentName: b.stringVal(c.SegmentName), - Segments: segments, - SerfAdvertiseAddrLAN: serfAdvertiseAddrLAN, - SerfAdvertiseAddrWAN: serfAdvertiseAddrWAN, - SerfBindAddrLAN: serfBindAddrLAN, - SerfBindAddrWAN: serfBindAddrWAN, - SerfPortLAN: serfPortLAN, - SerfPortWAN: serfPortWAN, - ServerMode: b.boolVal(c.ServerMode), - ServerName: b.stringVal(c.ServerName), - ServerPort: serverPort, - Services: services, - SessionTTLMin: b.durationVal("session_ttl_min", c.SessionTTLMin), - SkipLeaveOnInt: skipLeaveOnInt, - StartJoinAddrsLAN: b.expandAllOptionalAddrs("start_join", c.StartJoinAddrsLAN), - StartJoinAddrsWAN: b.expandAllOptionalAddrs("start_join_wan", c.StartJoinAddrsWAN), - SyslogFacility: b.stringVal(c.SyslogFacility), - TLSCipherSuites: b.tlsCipherSuites("tls_cipher_suites", c.TLSCipherSuites), - TLSMinVersion: b.stringVal(c.TLSMinVersion), - TLSPreferServerCipherSuites: b.boolVal(c.TLSPreferServerCipherSuites), - TaggedAddresses: c.TaggedAddresses, - TranslateWANAddrs: b.boolVal(c.TranslateWANAddrs), - UIDir: b.stringVal(c.UIDir), - UnixSocketGroup: b.stringVal(c.UnixSocket.Group), - UnixSocketMode: b.stringVal(c.UnixSocket.Mode), - UnixSocketUser: b.stringVal(c.UnixSocket.User), - VerifyIncoming: b.boolVal(c.VerifyIncoming), - VerifyIncomingHTTPS: b.boolVal(c.VerifyIncomingHTTPS), - VerifyIncomingRPC: b.boolVal(c.VerifyIncomingRPC), - VerifyOutgoing: b.boolVal(c.VerifyOutgoing), - VerifyServerHostname: b.boolVal(c.VerifyServerHostname), - Watches: c.Watches, + AdvertiseAddrLAN: advertiseAddrLAN, + AdvertiseAddrWAN: advertiseAddrWAN, + BindAddr: bindAddr, + Bootstrap: b.boolVal(c.Bootstrap), + BootstrapExpect: b.intVal(c.BootstrapExpect), + CAFile: b.stringVal(c.CAFile), + CAPath: b.stringVal(c.CAPath), + CertFile: b.stringVal(c.CertFile), + CheckUpdateInterval: b.durationVal("check_update_interval", c.CheckUpdateInterval), + Checks: checks, + ClientAddrs: clientAddrs, + ConnectEnabled: connectEnabled, + ConnectCAProvider: connectCAProvider, + ConnectCAConfig: connectCAConfig, + ConnectProxyBindMinPort: proxyBindMinPort, + ConnectProxyBindMaxPort: proxyBindMaxPort, + ConnectProxyDefaultExecMode: proxyDefaultExecMode, + ConnectProxyDefaultDaemonCommand: proxyDefaultDaemonCommand, + ConnectProxyDefaultScriptCommand: proxyDefaultScriptCommand, + ConnectProxyDefaultConfig: proxyDefaultConfig, + DataDir: b.stringVal(c.DataDir), + Datacenter: strings.ToLower(b.stringVal(c.Datacenter)), + DevMode: b.boolVal(b.Flags.DevMode), + DisableAnonymousSignature: b.boolVal(c.DisableAnonymousSignature), + DisableCoordinates: b.boolVal(c.DisableCoordinates), + DisableHostNodeID: b.boolVal(c.DisableHostNodeID), + DisableKeyringFile: b.boolVal(c.DisableKeyringFile), + DisableRemoteExec: b.boolVal(c.DisableRemoteExec), + DisableUpdateCheck: b.boolVal(c.DisableUpdateCheck), + DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput), + DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale), + EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks), + EnableDebug: b.boolVal(c.EnableDebug), + EnableScriptChecks: b.boolVal(c.EnableScriptChecks), + EnableSyslog: b.boolVal(c.EnableSyslog), + EnableUI: b.boolVal(c.UI), + EncryptKey: b.stringVal(c.EncryptKey), + EncryptVerifyIncoming: b.boolVal(c.EncryptVerifyIncoming), + EncryptVerifyOutgoing: b.boolVal(c.EncryptVerifyOutgoing), + KeyFile: b.stringVal(c.KeyFile), + LeaveDrainTime: b.durationVal("performance.leave_drain_time", c.Performance.LeaveDrainTime), + LeaveOnTerm: leaveOnTerm, + LogLevel: b.stringVal(c.LogLevel), + NodeID: types.NodeID(b.stringVal(c.NodeID)), + NodeMeta: c.NodeMeta, + NodeName: b.nodeName(c.NodeName), + NonVotingServer: b.boolVal(c.NonVotingServer), + PidFile: b.stringVal(c.PidFile), + RPCAdvertiseAddr: rpcAdvertiseAddr, + RPCBindAddr: rpcBindAddr, + RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout), + RPCMaxBurst: b.intVal(c.Limits.RPCMaxBurst), + RPCProtocol: b.intVal(c.RPCProtocol), + RPCRateLimit: rate.Limit(b.float64Val(c.Limits.RPCRate)), + RaftProtocol: b.intVal(c.RaftProtocol), + RaftSnapshotThreshold: b.intVal(c.RaftSnapshotThreshold), + RaftSnapshotInterval: b.durationVal("raft_snapshot_interval", c.RaftSnapshotInterval), + ReconnectTimeoutLAN: b.durationVal("reconnect_timeout", c.ReconnectTimeoutLAN), + ReconnectTimeoutWAN: b.durationVal("reconnect_timeout_wan", c.ReconnectTimeoutWAN), + RejoinAfterLeave: b.boolVal(c.RejoinAfterLeave), + RetryJoinIntervalLAN: b.durationVal("retry_interval", c.RetryJoinIntervalLAN), + RetryJoinIntervalWAN: b.durationVal("retry_interval_wan", c.RetryJoinIntervalWAN), + RetryJoinLAN: b.expandAllOptionalAddrs("retry_join", c.RetryJoinLAN), + RetryJoinMaxAttemptsLAN: b.intVal(c.RetryJoinMaxAttemptsLAN), + RetryJoinMaxAttemptsWAN: b.intVal(c.RetryJoinMaxAttemptsWAN), + RetryJoinWAN: b.expandAllOptionalAddrs("retry_join_wan", c.RetryJoinWAN), + SegmentName: b.stringVal(c.SegmentName), + Segments: segments, + SerfAdvertiseAddrLAN: serfAdvertiseAddrLAN, + SerfAdvertiseAddrWAN: serfAdvertiseAddrWAN, + SerfBindAddrLAN: serfBindAddrLAN, + SerfBindAddrWAN: serfBindAddrWAN, + SerfPortLAN: serfPortLAN, + SerfPortWAN: serfPortWAN, + ServerMode: b.boolVal(c.ServerMode), + ServerName: b.stringVal(c.ServerName), + ServerPort: serverPort, + Services: services, + SessionTTLMin: b.durationVal("session_ttl_min", c.SessionTTLMin), + SkipLeaveOnInt: skipLeaveOnInt, + StartJoinAddrsLAN: b.expandAllOptionalAddrs("start_join", c.StartJoinAddrsLAN), + StartJoinAddrsWAN: b.expandAllOptionalAddrs("start_join_wan", c.StartJoinAddrsWAN), + SyslogFacility: b.stringVal(c.SyslogFacility), + TLSCipherSuites: b.tlsCipherSuites("tls_cipher_suites", c.TLSCipherSuites), + TLSMinVersion: b.stringVal(c.TLSMinVersion), + TLSPreferServerCipherSuites: b.boolVal(c.TLSPreferServerCipherSuites), + TaggedAddresses: c.TaggedAddresses, + TranslateWANAddrs: b.boolVal(c.TranslateWANAddrs), + UIDir: b.stringVal(c.UIDir), + UnixSocketGroup: b.stringVal(c.UnixSocket.Group), + UnixSocketMode: b.stringVal(c.UnixSocket.Mode), + UnixSocketUser: b.stringVal(c.UnixSocket.User), + VerifyIncoming: b.boolVal(c.VerifyIncoming), + VerifyIncomingHTTPS: b.boolVal(c.VerifyIncomingHTTPS), + VerifyIncomingRPC: b.boolVal(c.VerifyIncomingRPC), + VerifyOutgoing: b.boolVal(c.VerifyOutgoing), + VerifyServerHostname: b.boolVal(c.VerifyServerHostname), + Watches: c.Watches, } if rt.BootstrapExpect == 1 { diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 15a7ac2bab..ea04d5aa0e 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -633,15 +633,15 @@ type RuntimeConfig struct { // ConnectProxyDefaultExecMode is used where a registration doesn't include an // exec_mode. Defaults to daemon. - ConnectProxyDefaultExecMode *string + ConnectProxyDefaultExecMode string // ConnectProxyDefaultDaemonCommand is used to start proxy in exec_mode = // daemon if not specified at registration time. - ConnectProxyDefaultDaemonCommand *string + ConnectProxyDefaultDaemonCommand string // ConnectProxyDefaultScriptCommand is used to start proxy in exec_mode = // script if not specified at registration time. - ConnectProxyDefaultScriptCommand *string + ConnectProxyDefaultScriptCommand string // ConnectProxyDefaultConfig is merged with any config specified at // registration time to allow global control of defaults. diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 9dff7733b8..36fffe16a6 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -2830,7 +2830,9 @@ func TestFullConfig(t *testing.T) { script_command = "proxyctl.sh" config = { foo = "bar" - connect_timeout_ms = 1000 + # hack float since json parses numbers as float and we have to + # assert against the same thing + connect_timeout_ms = 1000.0 pedantic_mode = true } } @@ -3423,6 +3425,14 @@ func TestFullConfig(t *testing.T) { "g4cvJyys": "IRLXE9Ds", "hyMy9Oxn": "XeBp4Sis", }, + ConnectProxyDefaultExecMode: "script", + ConnectProxyDefaultDaemonCommand: "consul connect proxy", + ConnectProxyDefaultScriptCommand: "proxyctl.sh", + ConnectProxyDefaultConfig: map[string]interface{}{ + "foo": "bar", + "connect_timeout_ms": float64(1000), + "pedantic_mode": true, + }, DNSAddrs: []net.Addr{tcpAddr("93.95.95.81:7001"), udpAddr("93.95.95.81:7001")}, DNSARecordLimit: 29907, DNSAllowStale: true, @@ -4099,9 +4109,9 @@ func TestSanitize(t *testing.T) { "ConnectProxyBindMaxPort": 0, "ConnectProxyBindMinPort": 0, "ConnectProxyDefaultConfig": {}, - "ConnectProxyDefaultDaemonCommand": null, - "ConnectProxyDefaultExecMode": null, - "ConnectProxyDefaultScriptCommand": null, + "ConnectProxyDefaultDaemonCommand": "", + "ConnectProxyDefaultExecMode": "", + "ConnectProxyDefaultScriptCommand": "", "ConsulCoordinateUpdateBatchSize": 0, "ConsulCoordinateUpdateMaxBatches": 0, "ConsulCoordinateUpdatePeriod": "15s", diff --git a/agent/connect/testing_ca.go b/agent/connect/testing_ca.go index fbb5eed499..552c575353 100644 --- a/agent/connect/testing_ca.go +++ b/agent/connect/testing_ca.go @@ -150,7 +150,7 @@ func TestLeaf(t testing.T, service string, root *structs.CARoot) (string, string spiffeId := &SpiffeIDService{ Host: fmt.Sprintf("%s.consul", testClusterID), Namespace: "default", - Datacenter: "dc01", + Datacenter: "dc1", Service: service, } diff --git a/agent/connect/testing_spiffe.go b/agent/connect/testing_spiffe.go index e2e7a470f9..d6a70cb81e 100644 --- a/agent/connect/testing_spiffe.go +++ b/agent/connect/testing_spiffe.go @@ -9,7 +9,7 @@ func TestSpiffeIDService(t testing.T, service string) *SpiffeIDService { return &SpiffeIDService{ Host: testClusterID + ".consul", Namespace: "default", - Datacenter: "dc01", + Datacenter: "dc1", Service: service, } } diff --git a/agent/http_oss.go b/agent/http_oss.go index d9b8068ef2..9b9857e40a 100644 --- a/agent/http_oss.go +++ b/agent/http_oss.go @@ -48,7 +48,7 @@ func init() { registerEndpoint("/v1/connect/ca/roots", []string{"GET"}, (*HTTPServer).ConnectCARoots) registerEndpoint("/v1/connect/intentions", []string{"GET", "POST"}, (*HTTPServer).IntentionEndpoint) registerEndpoint("/v1/connect/intentions/match", []string{"GET"}, (*HTTPServer).IntentionMatch) - registerEndpoint("/v1/connect/intentions/", []string{"GET"}, (*HTTPServer).IntentionSpecific) + registerEndpoint("/v1/connect/intentions/", []string{"GET", "PUT", "DELETE"}, (*HTTPServer).IntentionSpecific) registerEndpoint("/v1/coordinate/datacenters", []string{"GET"}, (*HTTPServer).CoordinateDatacenters) registerEndpoint("/v1/coordinate/nodes", []string{"GET"}, (*HTTPServer).CoordinateNodes) registerEndpoint("/v1/coordinate/node/", []string{"GET"}, (*HTTPServer).CoordinateNode) diff --git a/agent/local/state.go b/agent/local/state.go index 839b3cdb20..8df600b32f 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -608,7 +608,12 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*str l.Lock() defer l.Unlock() - // Allocate port if needed (min and max inclusive) + // Does this proxy instance allready exist? + if existing, ok := l.managedProxies[svc.ID]; ok { + svc.Port = existing.Proxy.ProxyService.Port + } + + // Allocate port if needed (min and max inclusive). rangeLen := l.config.ProxyBindMaxPort - l.config.ProxyBindMinPort + 1 if svc.Port < 1 && l.config.ProxyBindMinPort > 0 && rangeLen > 0 { // This should be a really short list so don't bother optimising lookup yet. diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 16975d9633..dd887ccb12 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -1721,6 +1721,21 @@ func TestStateProxyManagement(t *testing.T) { // Port is non-deterministic but could be either of 20000 or 20001 assert.Contains([]int{20000, 20001}, svc.Port) + { + // Re-registering same proxy again should not pick a random port but re-use + // the assigned one. + svcDup, err := state.AddProxy(&p1, "fake-token") + require.NoError(err) + + assert.Equal("web-proxy", svcDup.ID) + assert.Equal("web-proxy", svcDup.Service) + assert.Equal(structs.ServiceKindConnectProxy, svcDup.Kind) + assert.Equal("web", svcDup.ProxyDestination) + assert.Equal("", svcDup.Address, "should have empty address by default") + // Port must be same as before + assert.Equal(svc.Port, svcDup.Port) + } + // Second proxy should claim other port p2 := p1 p2.TargetServiceID = "cache" diff --git a/agent/structs/connect.go b/agent/structs/connect.go index 20970c1bf8..90513ae8c4 100644 --- a/agent/structs/connect.go +++ b/agent/structs/connect.go @@ -24,8 +24,11 @@ type ConnectAuthorizeRequest struct { type ProxyExecMode int const ( + // ProxyExecModeUnspecified uses the global default proxy mode. + ProxyExecModeUnspecified ProxyExecMode = iota + // ProxyExecModeDaemon executes a proxy process as a supervised daemon. - ProxyExecModeDaemon ProxyExecMode = iota + ProxyExecModeDaemon // ProxyExecModeScript executes a proxy config script on each change to it's // config. @@ -35,6 +38,8 @@ const ( // String implements Stringer func (m ProxyExecMode) String() string { switch m { + case ProxyExecModeUnspecified: + return "global_default" case ProxyExecModeDaemon: return "daemon" case ProxyExecModeScript: diff --git a/agent/structs/service_definition.go b/agent/structs/service_definition.go index 2ed4241781..d4dc214149 100644 --- a/agent/structs/service_definition.go +++ b/agent/structs/service_definition.go @@ -55,10 +55,11 @@ func (s *ServiceDefinition) ConnectManagedProxy() (*ConnectManagedProxy, error) // which we shouldn't hard code ourselves here... ns := s.NodeService() - execMode := ProxyExecModeDaemon + execMode := ProxyExecModeUnspecified switch s.Connect.Proxy.ExecMode { case "": - execMode = ProxyExecModeDaemon + // Use default + break case "daemon": execMode = ProxyExecModeDaemon case "script": diff --git a/api/agent.go b/api/agent.go index a81fd96f8c..b8125c91e1 100644 --- a/api/agent.go +++ b/api/agent.go @@ -609,9 +609,6 @@ func (a *Agent) ConnectCARoots(q *QueryOptions) (*CARootList, *QueryMeta, error) } // ConnectCALeaf gets the leaf certificate for the given service ID. -// -// TODO(mitchellh): we need to test this better once we have a way to -// configure CAs from the API package (when the CA work is done). func (a *Agent) ConnectCALeaf(serviceID string, q *QueryOptions) (*LeafCert, *QueryMeta, error) { r := a.c.newRequest("GET", "/v1/agent/connect/ca/leaf/"+serviceID) r.setQueryOptions(q) diff --git a/api/agent_test.go b/api/agent_test.go index 8cc58e012c..1f816c23a3 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -1049,17 +1049,71 @@ func TestAPI_AgentConnectCARoots_empty(t *testing.T) { agent := c.Agent() list, meta, err := agent.ConnectCARoots(nil) - require.Nil(err) + require.NoError(err) require.Equal(uint64(0), meta.LastIndex) require.Len(list.Roots, 0) } +func TestAPI_AgentConnectCARoots_list(t *testing.T) { + t.Parallel() + + require := require.New(t) + c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) { + // Force auto port range to 1 port so we have deterministic response. + c.Connect = map[string]interface{}{ + "enabled": true, + } + }) + defer s.Stop() + + agent := c.Agent() + list, meta, err := agent.ConnectCARoots(nil) + require.NoError(err) + require.True(meta.LastIndex > 0) + require.Len(list.Roots, 1) +} + +func TestAPI_AgentConnectCALeaf(t *testing.T) { + t.Parallel() + + require := require.New(t) + c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) { + // Force auto port range to 1 port so we have deterministic response. + c.Connect = map[string]interface{}{ + "enabled": true, + } + }) + defer s.Stop() + + agent := c.Agent() + // Setup service + reg := &AgentServiceRegistration{ + Name: "foo", + Tags: []string{"bar", "baz"}, + Port: 8000, + } + require.NoError(agent.ServiceRegister(reg)) + + leaf, meta, err := agent.ConnectCALeaf("foo", nil) + require.NoError(err) + require.True(meta.LastIndex > 0) + // Sanity checks here as we have actual certificate validation checks at many + // other levels. + require.NotEmpty(leaf.SerialNumber) + require.NotEmpty(leaf.CertPEM) + require.NotEmpty(leaf.PrivateKeyPEM) + require.Equal("foo", leaf.Service) + require.True(strings.HasSuffix(leaf.ServiceURI, "/svc/foo")) + require.True(leaf.ModifyIndex > 0) + require.True(leaf.ValidAfter.Before(time.Now())) + require.True(leaf.ValidBefore.After(time.Now())) +} + // TODO(banks): once we have CA stuff setup properly we can probably make this // much more complete. This is just a sanity check that the agent code basically // works. func TestAPI_AgentConnectAuthorize(t *testing.T) { t.Parallel() - require := require.New(t) c, s := makeClient(t) defer s.Stop() @@ -1079,7 +1133,15 @@ func TestAPI_AgentConnectAuthorize(t *testing.T) { func TestAPI_AgentConnectProxyConfig(t *testing.T) { t.Parallel() - c, s := makeClient(t) + c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) { + // Force auto port range to 1 port so we have deterministic response. + c.Connect = map[string]interface{}{ + "proxy_defaults": map[string]interface{}{ + "bind_min_port": 20000, + "bind_max_port": 20000, + }, + } + }) defer s.Stop() agent := c.Agent() @@ -1107,9 +1169,12 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) { TargetServiceName: "foo", ContentHash: "e662ea8600d84cf0", ExecMode: "daemon", - Command: "", + Command: "consul connect proxy", Config: map[string]interface{}{ - "foo": "bar", + "bind_address": "127.0.0.1", + "bind_port": float64(20000), + "foo": "bar", + "local_service_address": "127.0.0.1:8000", }, } require.Equal(t, expectConfig, config) diff --git a/connect/proxy/config.go b/connect/proxy/config.go index 6fad0bd559..840afa8961 100644 --- a/connect/proxy/config.go +++ b/connect/proxy/config.go @@ -52,11 +52,6 @@ type Config struct { // private key to be used in development instead of the ones supplied by // Connect. DevServiceKeyFile string `json:"dev_service_key_file" hcl:"dev_service_key_file"` - - // service is a connect.Service instance representing the proxied service. It - // is created internally by the code responsible for setting up config as it - // may depend on other external dependencies - service *connect.Service } // PublicListenerConfig contains the parameters needed for the incoming mTLS @@ -89,6 +84,9 @@ func (plc *PublicListenerConfig) applyDefaults() { if plc.HandshakeTimeoutMs == 0 { plc.HandshakeTimeoutMs = 10000 } + if plc.BindAddress == "" { + plc.BindAddress = "0.0.0.0" + } } // UpstreamConfig configures an upstream (outgoing) listener. @@ -258,7 +256,6 @@ func NewAgentConfigWatcher(client *api.Client, proxyID string, func (w *AgentConfigWatcher) handler(blockVal watch.BlockingParamVal, val interface{}) { - log.Printf("DEBUG: got hash %s", blockVal.(watch.WaitHashVal)) resp, ok := val.(*api.ConnectProxyConfig) if !ok { @@ -266,25 +263,16 @@ func (w *AgentConfigWatcher) handler(blockVal watch.BlockingParamVal, return } - // Setup Service instance now we know target ID etc - service, err := connect.NewService(resp.TargetServiceID, w.client) - if err != nil { - w.logger.Printf("[WARN] proxy config watch failed to initialize"+ - " service: %s", err) - return - } - // Create proxy config from the response cfg := &Config{ ProxyID: w.proxyID, // Token should be already setup in the client ProxiedServiceID: resp.TargetServiceID, ProxiedServiceNamespace: "default", - service: service, } // Unmarshal configs - err = mapstructure.Decode(resp.Config, &cfg.PublicListener) + err := mapstructure.Decode(resp.Config, &cfg.PublicListener) if err != nil { w.logger.Printf("[ERR] proxy config watch public listener config "+ "couldn't be parsed: %s", err) diff --git a/connect/proxy/config_test.go b/connect/proxy/config_test.go index e576d5f82a..1473e8fea6 100644 --- a/connect/proxy/config_test.go +++ b/connect/proxy/config_test.go @@ -175,11 +175,6 @@ func TestAgentConfigWatcher(t *testing.T) { }, } - // nil this out as comparisons are problematic, we'll explicitly sanity check - // it's reasonable later. - assert.NotNil(t, cfg.service) - cfg.service = nil - assert.Equal(t, expectCfg, cfg) // TODO(banks): Sanity check the service is viable and gets TLS certs eventually from @@ -213,11 +208,6 @@ func TestAgentConfigWatcher(t *testing.T) { }) expectCfg.PublicListener.LocalConnectTimeoutMs = 444 - // nil this out as comparisons are problematic, we'll explicitly sanity check - // it's reasonable later. - assert.NotNil(t, cfg.service) - cfg.service = nil - assert.Equal(t, expectCfg, cfg) } diff --git a/connect/proxy/listener.go b/connect/proxy/listener.go index 12134f8401..33f1f5292b 100644 --- a/connect/proxy/listener.go +++ b/connect/proxy/listener.go @@ -20,8 +20,10 @@ type Listener struct { // Service is the connect service instance to use. Service *connect.Service + // listenFunc, dialFunc and bindAddr are set by type-specific constructors listenFunc func() (net.Listener, error) dialFunc func() (net.Conn, error) + bindAddr string stopFlag int32 stopChan chan struct{} @@ -42,17 +44,17 @@ type Listener struct { // connections and proxy them to the configured local application over TCP. func NewPublicListener(svc *connect.Service, cfg PublicListenerConfig, logger *log.Logger) *Listener { + bindAddr := fmt.Sprintf("%s:%d", cfg.BindAddress, cfg.BindPort) return &Listener{ Service: svc, listenFunc: func() (net.Listener, error) { - return tls.Listen("tcp", - fmt.Sprintf("%s:%d", cfg.BindAddress, cfg.BindPort), - svc.ServerTLSConfig()) + return tls.Listen("tcp", bindAddr, svc.ServerTLSConfig()) }, dialFunc: func() (net.Conn, error) { return net.DialTimeout("tcp", cfg.LocalServiceAddress, time.Duration(cfg.LocalConnectTimeoutMs)*time.Millisecond) }, + bindAddr: bindAddr, stopChan: make(chan struct{}), listeningChan: make(chan struct{}), logger: logger, @@ -63,11 +65,11 @@ func NewPublicListener(svc *connect.Service, cfg PublicListenerConfig, // connections that are proxied to a discovered Connect service instance. func NewUpstreamListener(svc *connect.Service, cfg UpstreamConfig, logger *log.Logger) *Listener { + bindAddr := fmt.Sprintf("%s:%d", cfg.LocalBindAddress, cfg.LocalBindPort) return &Listener{ Service: svc, listenFunc: func() (net.Listener, error) { - return net.Listen("tcp", - fmt.Sprintf("%s:%d", cfg.LocalBindAddress, cfg.LocalBindPort)) + return net.Listen("tcp", bindAddr) }, dialFunc: func() (net.Conn, error) { if cfg.resolver == nil { @@ -78,6 +80,7 @@ func NewUpstreamListener(svc *connect.Service, cfg UpstreamConfig, defer cancel() return svc.Dial(ctx, cfg.resolver) }, + bindAddr: bindAddr, stopChan: make(chan struct{}), listeningChan: make(chan struct{}), logger: logger, @@ -142,3 +145,8 @@ func (l *Listener) Close() error { func (l *Listener) Wait() { <-l.listeningChan } + +// BindAddr returns the address the listen is bound to. +func (l *Listener) BindAddr() string { + return l.bindAddr +} diff --git a/connect/proxy/proxy.go b/connect/proxy/proxy.go index bda6f3afbd..717d45ae6d 100644 --- a/connect/proxy/proxy.go +++ b/connect/proxy/proxy.go @@ -1,6 +1,8 @@ package proxy import ( + "bytes" + "crypto/x509" "log" "github.com/hashicorp/consul/api" @@ -14,6 +16,7 @@ type Proxy struct { cfgWatcher ConfigWatcher stopChan chan struct{} logger *log.Logger + service *connect.Service } // NewFromConfigFile returns a Proxy instance configured just from a local file. @@ -27,12 +30,11 @@ func NewFromConfigFile(client *api.Client, filename string, } service, err := connect.NewDevServiceFromCertFiles(cfg.ProxiedServiceID, - client, logger, cfg.DevCAFile, cfg.DevServiceCertFile, + logger, cfg.DevCAFile, cfg.DevServiceCertFile, cfg.DevServiceKeyFile) if err != nil { return nil, err } - cfg.service = service p := &Proxy{ proxyID: cfg.ProxyID, @@ -40,6 +42,7 @@ func NewFromConfigFile(client *api.Client, filename string, cfgWatcher: NewStaticConfigWatcher(cfg), stopChan: make(chan struct{}), logger: logger, + service: service, } return p, nil } @@ -47,16 +50,18 @@ func NewFromConfigFile(client *api.Client, filename string, // New returns a Proxy with the given id, consuming the provided (configured) // agent. It is ready to Run(). func New(client *api.Client, proxyID string, logger *log.Logger) (*Proxy, error) { + cw, err := NewAgentConfigWatcher(client, proxyID, logger) + if err != nil { + return nil, err + } p := &Proxy{ - proxyID: proxyID, - client: client, - cfgWatcher: &AgentConfigWatcher{ - client: client, - proxyID: proxyID, - logger: logger, - }, - stopChan: make(chan struct{}), - logger: logger, + proxyID: proxyID, + client: client, + cfgWatcher: cw, + stopChan: make(chan struct{}), + logger: logger, + // Can't load service yet as we only have the proxy's ID not the service's + // until initial config fetch happens. } return p, nil } @@ -71,16 +76,29 @@ func (p *Proxy) Serve() error { select { case newCfg := <-p.cfgWatcher.Watch(): p.logger.Printf("[DEBUG] got new config") - if newCfg.service == nil { - p.logger.Printf("[ERR] new config has nil service") - continue - } + if cfg == nil { // Initial setup + // Setup Service instance now we know target ID etc + service, err := connect.NewService(newCfg.ProxiedServiceID, p.client) + if err != nil { + return err + } + p.service = service + + go func() { + <-service.ReadyWait() + p.logger.Printf("[INFO] proxy loaded config and ready to serve") + tcfg := service.ServerTLSConfig() + cert, _ := tcfg.GetCertificate(nil) + leaf, _ := x509.ParseCertificate(cert.Certificate[0]) + p.logger.Printf("[DEBUG] leaf: %s roots: %s", leaf.URIs[0], bytes.Join(tcfg.RootCAs.Subjects(), []byte(","))) + }() + newCfg.PublicListener.applyDefaults() - l := NewPublicListener(newCfg.service, newCfg.PublicListener, p.logger) - err := p.startListener("public listener", l) + l := NewPublicListener(p.service, newCfg.PublicListener, p.logger) + err = p.startListener("public listener", l) if err != nil { return err } @@ -93,7 +111,13 @@ func (p *Proxy) Serve() error { uc.applyDefaults() uc.resolver = UpstreamResolverFromClient(p.client, uc) - l := NewUpstreamListener(newCfg.service, uc, p.logger) + if uc.LocalBindPort < 1 { + p.logger.Printf("[ERR] upstream %s has no local_bind_port. "+ + "Can't start upstream.", uc.String()) + continue + } + + l := NewUpstreamListener(p.service, uc, p.logger) err := p.startListener(uc.String(), l) if err != nil { p.logger.Printf("[ERR] failed to start upstream %s: %s", uc.String(), @@ -110,6 +134,7 @@ func (p *Proxy) Serve() error { // startPublicListener is run from the internal state machine loop func (p *Proxy) startListener(name string, l *Listener) error { + p.logger.Printf("[INFO] %s starting on %s", name, l.BindAddr()) go func() { err := l.Serve() if err != nil { @@ -122,6 +147,7 @@ func (p *Proxy) startListener(name string, l *Listener) error { go func() { <-p.stopChan l.Close() + }() return nil @@ -131,4 +157,7 @@ func (p *Proxy) startListener(name string, l *Listener) error { // called only once. func (p *Proxy) Close() { close(p.stopChan) + if p.service != nil { + p.service.Close() + } } diff --git a/connect/resolver.go b/connect/resolver.go index 9873fcdf1f..98d8c88d37 100644 --- a/connect/resolver.go +++ b/connect/resolver.go @@ -7,7 +7,6 @@ import ( "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/api" - testing "github.com/mitchellh/go-testing-interface" ) // Resolver is the interface implemented by a service discovery mechanism to get @@ -122,7 +121,12 @@ func (cr *ConsulResolver) resolveService(ctx context.Context) (string, connect.C // propagating these trust domains we need to actually fetch the trust domain // somehow. We also need to implement namespaces. Use of test function here is // temporary pending the work on trust domains. - certURI := connect.TestSpiffeIDService(&testing.RuntimeT{}, cr.Name) + certURI := &connect.SpiffeIDService{ + Host: "11111111-2222-3333-4444-555555555555.consul", + Namespace: "default", + Datacenter: svcs[idx].Node.Datacenter, + Service: svcs[idx].Service.ProxyDestination, + } return fmt.Sprintf("%s:%d", addr, port), certURI, nil } diff --git a/connect/service.go b/connect/service.go index 4f38558a3f..af9fbfcb7a 100644 --- a/connect/service.go +++ b/connect/service.go @@ -41,7 +41,8 @@ type Service struct { // fetch certificates and print a loud error message. It will not Close() or // kill the process since that could lead to a crash loop in every service if // ACL token was revoked. All attempts to dial will error and any incoming - // connections will fail to verify. + // connections will fail to verify. It may be nil if the Service is being + // configured from local files for development or testing. client *api.Client // tlsCfg is the dynamic TLS config @@ -63,6 +64,10 @@ type Service struct { // NewService creates and starts a Service. The caller must close the returned // service to free resources and allow the program to exit normally. This is // typically called in a signal handler. +// +// Caller must provide client which is already configured to speak to the local +// Consul agent, and with an ACL token that has `service:write` privileges for +// the serviceID specified. func NewService(serviceID string, client *api.Client) (*Service, error) { return NewServiceWithLogger(serviceID, client, log.New(os.Stderr, "", log.LstdFlags)) @@ -89,7 +94,8 @@ func NewServiceWithLogger(serviceID string, client *api.Client, s.rootsWatch.HybridHandler = s.rootsWatchHandler p, err = watch.Parse(map[string]interface{}{ - "type": "connect_leaf", + "type": "connect_leaf", + "service_id": s.serviceID, }) if err != nil { return nil, err @@ -97,26 +103,33 @@ func NewServiceWithLogger(serviceID string, client *api.Client, s.leafWatch = p s.leafWatch.HybridHandler = s.leafWatchHandler - //go s.rootsWatch.RunWithClientAndLogger(s.client, s.logger) - //go s.leafWatch.RunWithClientAndLogger(s.client, s.logger) + go s.rootsWatch.RunWithClientAndLogger(client, s.logger) + go s.leafWatch.RunWithClientAndLogger(client, s.logger) return s, nil } // NewDevServiceFromCertFiles creates a Service using certificate and key files // passed instead of fetching them from the client. -func NewDevServiceFromCertFiles(serviceID string, client *api.Client, - logger *log.Logger, caFile, certFile, keyFile string) (*Service, error) { - s := &Service{ - serviceID: serviceID, - client: client, - logger: logger, - } +func NewDevServiceFromCertFiles(serviceID string, logger *log.Logger, + caFile, certFile, keyFile string) (*Service, error) { + tlsCfg, err := devTLSConfigFromFiles(caFile, certFile, keyFile) if err != nil { return nil, err } - s.tlsCfg = newDynamicTLSConfig(tlsCfg) + return NewDevServiceWithTLSConfig(serviceID, logger, tlsCfg) +} + +// NewDevServiceWithTLSConfig creates a Service using static TLS config passed. +// It's mostly useful for testing. +func NewDevServiceWithTLSConfig(serviceID string, logger *log.Logger, + tlsCfg *tls.Config) (*Service, error) { + s := &Service{ + serviceID: serviceID, + logger: logger, + tlsCfg: newDynamicTLSConfig(tlsCfg), + } return s, nil } @@ -274,3 +287,17 @@ func (s *Service) leafWatchHandler(blockParam watch.BlockingParamVal, raw interf s.tlsCfg.SetLeaf(&cert) } + +// Ready returns whether or not both roots and a leaf certificate are +// configured. If both are non-nil, they are assumed to be valid and usable. +func (s *Service) Ready() bool { + return s.tlsCfg.Ready() +} + +// ReadyWait returns a chan that is closed when the the Service becomes ready +// for use. Note that if the Service is ready when it is called it returns a nil +// chan. Ready means that it has root and leaf certificates configured which we +// assume are valid. +func (s *Service) ReadyWait() <-chan struct{} { + return s.tlsCfg.ReadyWait() +} diff --git a/connect/service_test.go b/connect/service_test.go index 20433d1f5f..64ca28fc7b 100644 --- a/connect/service_test.go +++ b/connect/service_test.go @@ -1,16 +1,21 @@ package connect import ( + "bytes" "context" "crypto/tls" + "crypto/x509" "fmt" "io" "io/ioutil" "net/http" + "strings" "testing" "time" + "github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/testutil/retry" "github.com/stretchr/testify/require" ) @@ -111,10 +116,91 @@ func TestService_Dial(t *testing.T) { } func TestService_ServerTLSConfig(t *testing.T) { - // TODO(banks): it's mostly meaningless to test this now since we directly set - // the tlsCfg in our TestService helper which is all we'd be asserting on here - // not the actual implementation. Once agent tls fetching is built, it becomes - // more meaningful to actually verify it's returning the correct config. + require := require.New(t) + + a := agent.NewTestAgent("007", "") + defer a.Shutdown() + client := a.Client() + agent := client.Agent() + + // NewTestAgent setup a CA already by default + + // Register a local agent service with a managed proxy + reg := &api.AgentServiceRegistration{ + Name: "web", + Port: 8080, + } + err := agent.ServiceRegister(reg) + require.NoError(err) + + // Now we should be able to create a service that will eventually get it's TLS + // all by itself! + service, err := NewService("web", client) + require.NoError(err) + + // Wait for it to be ready + select { + case <-service.ReadyWait(): + // continue with test case below + case <-time.After(1 * time.Second): + t.Fatalf("timeout waiting for Service.ReadyWait after 1s") + } + + tlsCfg := service.ServerTLSConfig() + + // Sanity check it has a leaf with the right ServiceID and that validates with + // the given roots. + require.NotNil(tlsCfg.GetCertificate) + leaf, err := tlsCfg.GetCertificate(&tls.ClientHelloInfo{}) + require.NoError(err) + cert, err := x509.ParseCertificate(leaf.Certificate[0]) + require.NoError(err) + require.Len(cert.URIs, 1) + require.True(strings.HasSuffix(cert.URIs[0].String(), "/svc/web")) + + // Verify it as a client would + err = clientSideVerifier(tlsCfg, leaf.Certificate) + require.NoError(err) + + // Now test that rotating the root updates + { + // Setup a new generated CA + connect.TestCAConfigSet(t, a, nil) + } + + // After some time, both root and leaves should be different but both should + // still be correct. + oldRootSubjects := bytes.Join(tlsCfg.RootCAs.Subjects(), []byte(", ")) + //oldLeafSerial := connect.HexString(cert.SerialNumber.Bytes()) + oldLeafKeyID := connect.HexString(cert.SubjectKeyId) + retry.Run(t, func(r *retry.R) { + updatedCfg := service.ServerTLSConfig() + + // Wait until roots are different + rootSubjects := bytes.Join(updatedCfg.RootCAs.Subjects(), []byte(", ")) + if bytes.Equal(oldRootSubjects, rootSubjects) { + r.Fatalf("root certificates should have changed, got %s", + rootSubjects) + } + + leaf, err := updatedCfg.GetCertificate(&tls.ClientHelloInfo{}) + r.Check(err) + cert, err := x509.ParseCertificate(leaf.Certificate[0]) + r.Check(err) + + // TODO(banks): Current CA implementation resets the serial index when CA + // config changes which means same serial is issued by new CA config failing + // this test. Re-enable once the CA is changed to fix that. + + // if oldLeafSerial == connect.HexString(cert.SerialNumber.Bytes()) { + // r.Fatalf("leaf certificate should have changed, got serial %s", + // oldLeafSerial) + // } + if oldLeafKeyID == connect.HexString(cert.SubjectKeyId) { + r.Fatalf("leaf should have a different key, got matching SubjectKeyID = %s", + oldLeafKeyID) + } + }) } func TestService_HTTPClient(t *testing.T) { diff --git a/connect/testing.go b/connect/testing.go index 491036aaf3..073134b12b 100644 --- a/connect/testing.go +++ b/connect/testing.go @@ -8,6 +8,7 @@ import ( "log" "net" "net/http" + "os" "sync/atomic" "github.com/hashicorp/consul/agent/connect" @@ -20,16 +21,12 @@ import ( func TestService(t testing.T, service string, ca *structs.CARoot) *Service { t.Helper() - // Don't need to talk to client since we are setting TLSConfig locally. This - // will cause server verification to skip AuthZ too. - svc, err := NewService(service, nil) + // Don't need to talk to client since we are setting TLSConfig locally + svc, err := NewDevServiceWithTLSConfig(service, + log.New(os.Stderr, "", log.LstdFlags), TestTLSConfig(t, service, ca)) if err != nil { t.Fatal(err) } - - // Override the tlsConfig hackily. - svc.tlsCfg = newDynamicTLSConfig(TestTLSConfig(t, service, ca)) - return svc } diff --git a/connect/tls.go b/connect/tls.go index f5cb95a759..6f14cd787c 100644 --- a/connect/tls.go +++ b/connect/tls.go @@ -4,7 +4,9 @@ import ( "crypto/tls" "crypto/x509" "errors" + "fmt" "io/ioutil" + "log" "sync" "github.com/hashicorp/consul/agent/connect" @@ -104,7 +106,8 @@ func verifyServerCertMatchesURI(certs []*x509.Certificate, if cert.URIs[0].String() == expectedStr { return nil } - return errors.New("peer certificate mismatch") + return fmt.Errorf("peer certificate mismatch got %s, want %s", + cert.URIs[0].String(), expectedStr) } // newServerSideVerifier returns a verifierFunc that wraps the provided @@ -115,21 +118,25 @@ func newServerSideVerifier(client *api.Client, serviceID string) verifierFunc { return func(tlsCfg *tls.Config, rawCerts [][]byte) error { leaf, err := verifyChain(tlsCfg, rawCerts, false) if err != nil { + log.Printf("connect: failed TLS verification: %s", err) return err } // Check leaf is a cert we understand if len(leaf.URIs) < 1 { + log.Printf("connect: invalid leaf certificate") return errors.New("connect: invalid leaf certificate") } certURI, err := connect.ParseCertURI(leaf.URIs[0]) if err != nil { + log.Printf("connect: invalid leaf certificate URI") return errors.New("connect: invalid leaf certificate URI") } // No AuthZ if there is no client. if client == nil { + log.Printf("connect: nil client") return nil } @@ -148,9 +155,11 @@ func newServerSideVerifier(client *api.Client, serviceID string) verifierFunc { } resp, err := client.Agent().ConnectAuthorize(req) if err != nil { + log.Printf("connect: authz call failed: %s", err) return errors.New("connect: authz call failed: " + err.Error()) } if !resp.Authorized { + log.Printf("connect: authz call denied: %s", resp.Reason) return errors.New("connect: authz denied: " + resp.Reason) } return nil @@ -217,9 +226,17 @@ func verifyChain(tlsCfg *tls.Config, rawCerts [][]byte, client bool) (*x509.Cert type dynamicTLSConfig struct { base *tls.Config - sync.Mutex + sync.RWMutex leaf *tls.Certificate roots *x509.CertPool + // readyCh is closed when the config first gets both leaf and roots set. + // Watchers can wait on this via ReadyWait. + readyCh chan struct{} +} + +type tlsCfgUpdate struct { + ch chan struct{} + next *tlsCfgUpdate } // newDynamicTLSConfig returns a dynamicTLSConfig constructed from base. @@ -235,6 +252,9 @@ func newDynamicTLSConfig(base *tls.Config) *dynamicTLSConfig { if base.RootCAs != nil { cfg.roots = base.RootCAs } + if !cfg.Ready() { + cfg.readyCh = make(chan struct{}) + } return cfg } @@ -246,8 +266,8 @@ func newDynamicTLSConfig(base *tls.Config) *dynamicTLSConfig { // client can use this config for a long time and will still verify against the // latest roots even though the roots in the struct is has can't change. func (cfg *dynamicTLSConfig) Get(v verifierFunc) *tls.Config { - cfg.Lock() - defer cfg.Unlock() + cfg.RLock() + defer cfg.RUnlock() copy := cfg.base.Clone() copy.RootCAs = cfg.roots copy.ClientCAs = cfg.roots @@ -281,6 +301,7 @@ func (cfg *dynamicTLSConfig) SetRoots(roots *x509.CertPool) error { cfg.Lock() defer cfg.Unlock() cfg.roots = roots + cfg.notify() return nil } @@ -289,19 +310,43 @@ func (cfg *dynamicTLSConfig) SetLeaf(leaf *tls.Certificate) error { cfg.Lock() defer cfg.Unlock() cfg.leaf = leaf + cfg.notify() return nil } +// notify is called under lock during an update to check if we are now ready. +func (cfg *dynamicTLSConfig) notify() { + if cfg.readyCh != nil && cfg.leaf != nil && cfg.roots != nil { + close(cfg.readyCh) + cfg.readyCh = nil + } +} + // Roots returns the current CA root CertPool. func (cfg *dynamicTLSConfig) Roots() *x509.CertPool { - cfg.Lock() - defer cfg.Unlock() + cfg.RLock() + defer cfg.RUnlock() return cfg.roots } // Leaf returns the current Leaf certificate. func (cfg *dynamicTLSConfig) Leaf() *tls.Certificate { - cfg.Lock() - defer cfg.Unlock() + cfg.RLock() + defer cfg.RUnlock() return cfg.leaf } + +// Ready returns whether or not both roots and a leaf certificate are +// configured. If both are non-nil, they are assumed to be valid and usable. +func (cfg *dynamicTLSConfig) Ready() bool { + cfg.RLock() + defer cfg.RUnlock() + return cfg.leaf != nil && cfg.roots != nil +} + +// ReadyWait returns a chan that is closed when the the tlsConfig becomes Ready +// for use. Note that if the config is ready when it is called it returns a nil +// chan. +func (cfg *dynamicTLSConfig) ReadyWait() <-chan struct{} { + return cfg.readyCh +} diff --git a/connect/tls_test.go b/connect/tls_test.go index aa1063f3e8..a9fd6fe8c1 100644 --- a/connect/tls_test.go +++ b/connect/tls_test.go @@ -358,3 +358,45 @@ func TestDynamicTLSConfig(t *testing.T) { requireCorrectVerifier(t, newCfg, gotBefore, v1Ch) requireCorrectVerifier(t, newCfg, gotAfter, v2Ch) } + +func TestDynamicTLSConfig_Ready(t *testing.T) { + require := require.New(t) + + ca1 := connect.TestCA(t, nil) + baseCfg := TestTLSConfig(t, "web", ca1) + + c := newDynamicTLSConfig(defaultTLSConfig()) + readyCh := c.ReadyWait() + assertBlocked(t, readyCh) + require.False(c.Ready(), "no roots or leaf, should not be ready") + + err := c.SetLeaf(&baseCfg.Certificates[0]) + require.NoError(err) + assertBlocked(t, readyCh) + require.False(c.Ready(), "no roots, should not be ready") + + err = c.SetRoots(baseCfg.RootCAs) + require.NoError(err) + assertNotBlocked(t, readyCh) + require.True(c.Ready(), "should be ready") +} + +func assertBlocked(t *testing.T, ch <-chan struct{}) { + t.Helper() + select { + case <-ch: + t.Fatalf("want blocked chan") + default: + return + } +} + +func assertNotBlocked(t *testing.T, ch <-chan struct{}) { + t.Helper() + select { + case <-ch: + return + default: + t.Fatalf("want unblocked chan but it blocked") + } +} diff --git a/testutil/server.go b/testutil/server.go index 06c0fdfd28..f188079d70 100644 --- a/testutil/server.go +++ b/testutil/server.go @@ -17,6 +17,7 @@ import ( "fmt" "io" "io/ioutil" + "log" "net" "net/http" "os" @@ -94,6 +95,7 @@ type TestServerConfig struct { VerifyIncomingHTTPS bool `json:"verify_incoming_https,omitempty"` VerifyOutgoing bool `json:"verify_outgoing,omitempty"` EnableScriptChecks bool `json:"enable_script_checks,omitempty"` + Connect map[string]interface{} `json:"connect,omitempty"` ReadyTimeout time.Duration `json:"-"` Stdout, Stderr io.Writer `json:"-"` Args []string `json:"-"` @@ -211,6 +213,7 @@ func newTestServerConfigT(t *testing.T, cb ServerConfigCallback) (*TestServer, e return nil, errors.Wrap(err, "failed marshaling json") } + log.Printf("CONFIG JSON: %s", string(b)) configFile := filepath.Join(tmpdir, "config.json") if err := ioutil.WriteFile(configFile, b, 0644); err != nil { defer os.RemoveAll(tmpdir) diff --git a/watch/funcs.go b/watch/funcs.go index 5e72e40a66..3b1b854ed8 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -236,8 +236,7 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) { // connectRootsWatch is used to watch for changes to Connect Root certificates. func connectRootsWatch(params map[string]interface{}) (WatcherFunc, error) { - // We don't support stale since roots are likely to be cached locally in the - // agent anyway. + // We don't support stale since roots are cached locally in the agent. fn := func(p *Plan) (BlockingParamVal, interface{}, error) { agent := p.client.Agent() @@ -257,8 +256,7 @@ func connectRootsWatch(params map[string]interface{}) (WatcherFunc, error) { // connectLeafWatch is used to watch for changes to Connect Leaf certificates // for given local service id. func connectLeafWatch(params map[string]interface{}) (WatcherFunc, error) { - // We don't support stale since certs are likely to be cached locally in the - // agent anyway. + // We don't support stale since certs are cached locally in the agent. var serviceID string if err := assignValue(params, "service_id", &serviceID); err != nil { diff --git a/watch/funcs_test.go b/watch/funcs_test.go index d5253de444..b304a803fe 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -7,8 +7,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/hashicorp/consul/agent" - "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/connect" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/watch" "github.com/stretchr/testify/require" @@ -526,14 +528,12 @@ func TestEventWatch(t *testing.T) { } func TestConnectRootsWatch(t *testing.T) { - // TODO(banks) enable and make it work once this is supported. Note that this - // test actually passes currently just by busy-polling the roots endpoint - // until it changes. - t.Skip("CA and Leaf implementation don't actually support blocking yet") t.Parallel() - a := agent.NewTestAgent(t.Name(), ``) + // NewTestAgent will bootstrap a new CA + a := agent.NewTestAgent(t.Name(), "") defer a.Shutdown() + var originalCAID string invoke := makeInvokeCh() plan := mustParse(t, `{"type":"connect_roots"}`) plan.Handler = func(idx uint64, raw interface{}) { @@ -544,7 +544,14 @@ func TestConnectRootsWatch(t *testing.T) { if !ok || v == nil { return // ignore } - // TODO(banks): verify the right roots came back. + // Only 1 CA is the bootstrapped state (i.e. first response). Ignore this + // state and wait for the new CA to show up too. + if len(v.Roots) == 1 { + originalCAID = v.ActiveRootID + return + } + assert.NotEmpty(t, originalCAID) + assert.NotEqual(t, originalCAID, v.ActiveRootID) invoke <- nil } @@ -553,22 +560,8 @@ func TestConnectRootsWatch(t *testing.T) { go func() { defer wg.Done() time.Sleep(20 * time.Millisecond) - // TODO(banks): this is a hack since CA config is in flux. We _did_ expose a - // temporary agent endpoint for PUTing config, but didn't expose it in `api` - // package intentionally. If we are going to hack around with temporary API, - // we can might as well drop right down to the RPC level... - args := structs.CAConfiguration{ - Provider: "static", - Config: map[string]interface{}{ - "Name": "test-1", - "Generate": true, - }, - } - var reply interface{} - if err := a.RPC("ConnectCA.ConfigurationSet", &args, &reply); err != nil { - t.Fatalf("err: %v", err) - } - + // Set a new CA + connect.TestCAConfigSet(t, a, nil) }() wg.Add(1) @@ -588,9 +581,8 @@ func TestConnectRootsWatch(t *testing.T) { } func TestConnectLeafWatch(t *testing.T) { - // TODO(banks) enable and make it work once this is supported. - t.Skip("CA and Leaf implementation don't actually support blocking yet") t.Parallel() + // NewTestAgent will bootstrap a new CA a := agent.NewTestAgent(t.Name(), ``) defer a.Shutdown() @@ -606,25 +598,10 @@ func TestConnectLeafWatch(t *testing.T) { require.Nil(t, err) } - // Setup a new generated CA - // - // TODO(banks): this is a hack since CA config is in flux. We _did_ expose a - // temporary agent endpoint for PUTing config, but didn't expose it in `api` - // package intentionally. If we are going to hack around with temporary API, - // we can might as well drop right down to the RPC level... - args := structs.CAConfiguration{ - Provider: "static", - Config: map[string]interface{}{ - "Name": "test-1", - "Generate": true, - }, - } - var reply interface{} - if err := a.RPC("ConnectCA.ConfigurationSet", &args, &reply); err != nil { - t.Fatalf("err: %v", err) - } + var lastCert *consulapi.LeafCert - invoke := makeInvokeCh() + //invoke := makeInvokeCh() + invoke := make(chan error) plan := mustParse(t, `{"type":"connect_leaf", "service_id":"web"}`) plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { @@ -634,7 +611,18 @@ func TestConnectLeafWatch(t *testing.T) { if !ok || v == nil { return // ignore } - // TODO(banks): verify the right leaf came back. + if lastCert == nil { + // Initial fetch, just store the cert and return + lastCert = v + return + } + // TODO(banks): right now the root rotation actually causes Serial numbers + // to reset so these end up all being the same. That needs fixing but it's + // a bigger task than I want to bite off for this PR. + //assert.NotEqual(t, lastCert.SerialNumber, v.SerialNumber) + assert.NotEqual(t, lastCert.CertPEM, v.CertPEM) + assert.NotEqual(t, lastCert.PrivateKeyPEM, v.PrivateKeyPEM) + assert.NotEqual(t, lastCert.ModifyIndex, v.ModifyIndex) invoke <- nil } @@ -643,20 +631,8 @@ func TestConnectLeafWatch(t *testing.T) { go func() { defer wg.Done() time.Sleep(20 * time.Millisecond) - - // Change the CA which should eventually trigger a leaf change but probably - // won't now so this test has no way to succeed yet. - args := structs.CAConfiguration{ - Provider: "static", - Config: map[string]interface{}{ - "Name": "test-2", - "Generate": true, - }, - } - var reply interface{} - if err := a.RPC("ConnectCA.ConfigurationSet", &args, &reply); err != nil { - t.Fatalf("err: %v", err) - } + // Change the CA to trigger a leaf change + connect.TestCAConfigSet(t, a, nil) }() wg.Add(1) @@ -740,6 +716,7 @@ func TestConnectProxyConfigWatch(t *testing.T) { } func mustParse(t *testing.T, q string) *watch.Plan { + t.Helper() var params map[string]interface{} if err := json.Unmarshal([]byte(q), ¶ms); err != nil { t.Fatal(err)