You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/connect/proxy/proxy.go

165 lines
4.5 KiB

package proxy
import (
"crypto/x509"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/connect"
"github.com/hashicorp/consul/lib"
)
// Proxy implements the built-in connect proxy.
type Proxy struct {
client *api.Client
cfgWatcher ConfigWatcher
stopChan chan struct{}
logger hclog.Logger
service *connect.Service
}
// New returns a proxy with the given configuration source.
//
// The ConfigWatcher can be used to update the configuration of the proxy.
// Whenever a new configuration is detected, the proxy will reconfigure itself.
func New(client *api.Client, cw ConfigWatcher, logger hclog.Logger) (*Proxy, error) {
return &Proxy{
client: client,
cfgWatcher: cw,
stopChan: make(chan struct{}),
logger: logger,
}, nil
}
// Serve the proxy instance until a fatal error occurs or proxy is closed.
func (p *Proxy) Serve() error {
var cfg *Config
// failCh is used to stop Serve and return an error from another goroutine we
// spawn.
failCh := make(chan error, 1)
// Watch for config changes (initial setup happens on first "change")
for {
select {
case err := <-failCh:
// don't log here, we can log with better context at the point where we
// write the err to the chan
return err
case newCfg := <-p.cfgWatcher.Watch():
p.logger.Debug("got new config")
if cfg == nil {
// Initial setup
// Setup telemetry if configured
// NOTE(kit): As far as I can tell, all of the metrics in the proxy are generated at runtime, so we
// don't have any static metrics we initialize at start.
_, err := lib.InitTelemetry(newCfg.Telemetry)
if err != nil {
p.logger.Error("proxy telemetry config error", "error", err)
}
// Setup Service instance now we know target ID etc
service, err := newCfg.Service(p.client, p.logger)
if err != nil {
return err
}
p.service = service
go func() {
<-service.ReadyWait()
p.logger.Info("Proxy loaded config and ready to serve")
tcfg := service.ServerTLSConfig()
cert, _ := tcfg.GetCertificate(nil)
leaf, _ := x509.ParseCertificate(cert.Certificate[0])
roots, err := connect.CommonNamesFromCertPool(tcfg.RootCAs)
if err != nil {
p.logger.Error("Failed to parse root subjects", "error", err)
} else {
p.logger.Info("Parsed TLS identity", "uri", leaf.URIs[0], "roots", roots)
}
// Only start a listener if we have a port set. This allows
// the configuration to disable our public listener.
if newCfg.PublicListener.BindPort != 0 {
newCfg.PublicListener.applyDefaults()
l := NewPublicListener(p.service, newCfg.PublicListener, p.logger)
err = p.startListener("public listener", l)
if err != nil {
// This should probably be fatal.
p.logger.Error("failed to start public listener", "error", err)
failCh <- err
}
}
}()
}
// TODO(banks) update/remove upstreams properly based on a diff with current. Can
// store a map of uc.String() to Listener here and then use it to only
// start one of each and stop/modify if changes occur.
for _, uc := range newCfg.Upstreams {
uc.applyDefaults()
if uc.LocalBindSocketPath != "" {
p.logger.Error("local_bind_socket_path is not supported with this proxy implementation. "+
"Can't start upstream.", "upstream", uc.String())
continue
}
if uc.LocalBindPort < 1 {
p.logger.Error("upstream has no local_bind_port. "+
"Can't start upstream.", "upstream", uc.String())
continue
}
Add Proxy Upstreams to Service Definition (#4639) * Refactor Service Definition ProxyDestination. This includes: - Refactoring all internal structs used - Updated tests for both deprecated and new input for: - Agent Services endpoint response - Agent Service endpoint response - Agent Register endpoint - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Register - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Services endpoint response - Catalog Node endpoint response - Catalog Service endpoint response - Updated API tests for all of the above too (both deprecated and new forms of register) TODO: - config package changes for on-disk service definitions - proxy config endpoint - built-in proxy support for new fields * Agent proxy config endpoint updated with upstreams * Config file changes for upstreams. * Add upstream opaque config and update all tests to ensure it works everywhere. * Built in proxy working with new Upstreams config * Command fixes and deprecations * Fix key translation, upstream type defaults and a spate of other subtele bugs found with ned to end test scripts... TODO: tests still failing on one case that needs a fix. I think it's key translation for upstreams nested in Managed proxy struct. * Fix translated keys in API registration. ≈ * Fixes from docs - omit some empty undocumented fields in API - Bring back ServiceProxyDestination in Catalog responses to not break backwards compat - this was removed assuming it was only used internally. * Documentation updates for Upstreams in service definition * Fixes for tests broken by many refactors. * Enable travis on f-connect branch in this branch too. * Add consistent Deprecation comments to ProxyDestination uses * Update version number on deprecation notices, and correct upstream datacenter field with explanation in docs
6 years ago
l := NewUpstreamListener(p.service, p.client, uc, p.logger)
err := p.startListener(uc.String(), l)
if err != nil {
p.logger.Error("failed to start upstream",
"upstream", uc.String(),
"error", err,
)
}
}
cfg = newCfg
case <-p.stopChan:
return nil
}
}
}
// startPublicListener is run from the internal state machine loop
func (p *Proxy) startListener(name string, l *Listener) error {
p.logger.Info("Starting listener", "listener", name, "bind_addr", l.BindAddr())
go func() {
err := l.Serve()
if err != nil {
p.logger.Error("listener stopped with error", "listener", name, "error", err)
return
}
p.logger.Info("listener stopped", "listener", name)
}()
go func() {
<-p.stopChan
l.Close()
}()
return nil
}
// Close stops the proxy and terminates all active connections. It must be
// called only once.
func (p *Proxy) Close() {
close(p.stopChan)
if p.service != nil {
p.service.Close()
}
}