From c0d129003b3dc3989aa1bbf36a4942e2f688b1b1 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Mon, 8 Mar 2021 14:10:00 -0800 Subject: [PATCH] Handle loadbalancer port in TIME_WAIT If the port wanted by the client load balancer is in TIME_WAIT, startup will fail. Set SO_REUSEPORT so that it can be listened on again immediately. The configurable Listen call wants a context, so plumb that through as well. Signed-off-by: Brad Davidson --- pkg/agent/config/config.go | 6 +++--- pkg/agent/loadbalancer/loadbalancer.go | 13 +++++++++++-- pkg/agent/loadbalancer/loadbalancer_test.go | 5 +++-- pkg/agent/proxy/apiproxy.go | 11 ++++++----- pkg/agent/run.go | 2 +- pkg/cluster/cluster.go | 2 +- pkg/etcd/etcdproxy.go | 5 +++-- 7 files changed, 28 insertions(+), 16 deletions(-) diff --git a/pkg/agent/config/config.go b/pkg/agent/config/config.go index 8e5d25af2f..2b20a543aa 100644 --- a/pkg/agent/config/config.go +++ b/pkg/agent/config/config.go @@ -39,7 +39,7 @@ const ( func Get(ctx context.Context, agent cmds.Agent, proxy proxy.Proxy) *config.Node { for { - agentConfig, err := get(&agent, proxy) + agentConfig, err := get(ctx, &agent, proxy) if err != nil { logrus.Errorf("Failed to retrieve agent config: %v", err) select { @@ -293,7 +293,7 @@ func locateOrGenerateResolvConf(envInfo *cmds.Agent) string { return tmpConf } -func get(envInfo *cmds.Agent, proxy proxy.Proxy) (*config.Node, error) { +func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.Node, error) { if envInfo.Debug { logrus.SetLevel(logrus.DebugLevel) } @@ -310,7 +310,7 @@ func get(envInfo *cmds.Agent, proxy proxy.Proxy) (*config.Node, error) { // If the supervisor and externally-facing apiserver are not on the same port, tell the proxy where to find the apiserver. if controlConfig.SupervisorPort != controlConfig.HTTPSPort { - if err := proxy.SetAPIServerPort(controlConfig.HTTPSPort); err != nil { + if err := proxy.SetAPIServerPort(ctx, controlConfig.HTTPSPort); err != nil { return nil, errors.Wrapf(err, "failed to setup access to API Server port %d on at %s", controlConfig.HTTPSPort, proxy.SupervisorURL()) } } diff --git a/pkg/agent/loadbalancer/loadbalancer.go b/pkg/agent/loadbalancer/loadbalancer.go index 88f1f762f6..457dc2ac4c 100644 --- a/pkg/agent/loadbalancer/loadbalancer.go +++ b/pkg/agent/loadbalancer/loadbalancer.go @@ -7,10 +7,12 @@ import ( "path/filepath" "strconv" "sync" + "syscall" "github.com/google/tcpproxy" "github.com/rancher/k3s/pkg/version" "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" ) type LoadBalancer struct { @@ -38,8 +40,9 @@ var ( ETCDServerServiceName = version.Program + "-etcd-server-load-balancer" ) -func New(dataDir, serviceName, serverURL string, lbServerPort int) (_lb *LoadBalancer, _err error) { - listener, err := net.Listen("tcp", "127.0.0.1:"+strconv.Itoa(lbServerPort)) +func New(ctx context.Context, dataDir, serviceName, serverURL string, lbServerPort int) (_lb *LoadBalancer, _err error) { + config := net.ListenConfig{Control: reusePort} + listener, err := config.Listen(ctx, "tcp", "127.0.0.1:"+strconv.Itoa(lbServerPort)) defer func() { if _err != nil { logrus.Warnf("Error starting load balancer: %s", _err) @@ -153,3 +156,9 @@ func onDialError(src net.Conn, dstDialErr error) { logrus.Debugf("Incoming conn %v, error dialing load balancer servers: %v", src.RemoteAddr().String(), dstDialErr) src.Close() } + +func reusePort(network, address string, conn syscall.RawConn) error { + return conn.Control(func(descriptor uintptr) { + syscall.SetsockoptInt(int(descriptor), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) + }) +} diff --git a/pkg/agent/loadbalancer/loadbalancer_test.go b/pkg/agent/loadbalancer/loadbalancer_test.go index 44326366d9..82db1157df 100644 --- a/pkg/agent/loadbalancer/loadbalancer_test.go +++ b/pkg/agent/loadbalancer/loadbalancer_test.go @@ -2,6 +2,7 @@ package loadbalancer import ( "bufio" + "context" "errors" "fmt" "io/ioutil" @@ -105,7 +106,7 @@ func TestFailOver(t *testing.T) { DataDir: tmpDir, } - lb, err := New(cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort) + lb, err := New(context.TODO(), cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort) if err != nil { assertEqual(t, err, nil) } @@ -156,7 +157,7 @@ func TestFailFast(t *testing.T) { DataDir: tmpDir, } - lb, err := New(cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort) + lb, err := New(context.TODO(), cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort) if err != nil { assertEqual(t, err, nil) } diff --git a/pkg/agent/proxy/apiproxy.go b/pkg/agent/proxy/apiproxy.go index 2efc6f5590..f1c7a916ee 100644 --- a/pkg/agent/proxy/apiproxy.go +++ b/pkg/agent/proxy/apiproxy.go @@ -1,6 +1,7 @@ package proxy import ( + "context" sysnet "net" "net/url" "strconv" @@ -13,7 +14,7 @@ import ( type Proxy interface { Update(addresses []string) - SetAPIServerPort(port int) error + SetAPIServerPort(ctx context.Context, port int) error SupervisorURL() string SupervisorAddresses() []string APIServerURL() string @@ -27,7 +28,7 @@ type Proxy interface { // NOTE: This is a proxy in the API sense - it returns either actual server URLs, or the URL of the // local load-balancer. It is not actually responsible for proxying requests at the network level; // this is handled by the load-balancers that the proxy optionally steers connections towards. -func NewSupervisorProxy(lbEnabled bool, dataDir, supervisorURL string, lbServerPort int) (Proxy, error) { +func NewSupervisorProxy(ctx context.Context, lbEnabled bool, dataDir, supervisorURL string, lbServerPort int) (Proxy, error) { p := proxy{ lbEnabled: lbEnabled, dataDir: dataDir, @@ -38,7 +39,7 @@ func NewSupervisorProxy(lbEnabled bool, dataDir, supervisorURL string, lbServerP } if lbEnabled { - lb, err := loadbalancer.New(dataDir, loadbalancer.SupervisorServiceName, supervisorURL, p.lbServerPort) + lb, err := loadbalancer.New(ctx, dataDir, loadbalancer.SupervisorServiceName, supervisorURL, p.lbServerPort) if err != nil { return nil, err } @@ -108,7 +109,7 @@ func (p *proxy) setSupervisorPort(addresses []string) []string { // load-balancing is enabled, another load-balancer is started on a port one below the supervisor // load-balancer, and the address of this load-balancer is returned instead of the actual apiserver // addresses. -func (p *proxy) SetAPIServerPort(port int) error { +func (p *proxy) SetAPIServerPort(ctx context.Context, port int) error { u, err := url.Parse(p.initialSupervisorURL) if err != nil { return errors.Wrapf(err, "failed to parse server URL %s", p.initialSupervisorURL) @@ -123,7 +124,7 @@ func (p *proxy) SetAPIServerPort(port int) error { if lbServerPort != 0 { lbServerPort = lbServerPort - 1 } - lb, err := loadbalancer.New(p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL, lbServerPort) + lb, err := loadbalancer.New(ctx, p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL, lbServerPort) if err != nil { return err } diff --git a/pkg/agent/run.go b/pkg/agent/run.go index aefb347065..d25f42970e 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -148,7 +148,7 @@ func Run(ctx context.Context, cfg cmds.Agent) error { return err } - proxy, err := proxy.NewSupervisorProxy(!cfg.DisableLoadBalancer, agentDir, cfg.ServerURL, cfg.LBServerPort) + proxy, err := proxy.NewSupervisorProxy(ctx, !cfg.DisableLoadBalancer, agentDir, cfg.ServerURL, cfg.LBServerPort) if err != nil { return err } diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 76f725df33..801cf6a67b 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -54,7 +54,7 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) { clientURL.Host = clientURL.Hostname() + ":2379" clientURLs = append(clientURLs, clientURL.String()) } - etcdProxy, err := etcd.NewETCDProxy(true, c.config.DataDir, clientURLs[0]) + etcdProxy, err := etcd.NewETCDProxy(ctx, true, c.config.DataDir, clientURLs[0]) if err != nil { return nil, err } diff --git a/pkg/etcd/etcdproxy.go b/pkg/etcd/etcdproxy.go index 0d7ad801dd..fe61a4a13d 100644 --- a/pkg/etcd/etcdproxy.go +++ b/pkg/etcd/etcdproxy.go @@ -1,6 +1,7 @@ package etcd import ( + "context" "net/url" "github.com/pkg/errors" @@ -16,7 +17,7 @@ type Proxy interface { // NewETCDProxy initializes a new proxy structure that contain a load balancer // which listens on port 2379 and proxy between etcd cluster members -func NewETCDProxy(enabled bool, dataDir, etcdURL string) (Proxy, error) { +func NewETCDProxy(ctx context.Context, enabled bool, dataDir, etcdURL string) (Proxy, error) { u, err := url.Parse(etcdURL) if err != nil { return nil, errors.Wrap(err, "failed to parse etcd client URL") @@ -29,7 +30,7 @@ func NewETCDProxy(enabled bool, dataDir, etcdURL string) (Proxy, error) { } if enabled { - lb, err := loadbalancer.New(dataDir, loadbalancer.ETCDServerServiceName, etcdURL, 2379) + lb, err := loadbalancer.New(ctx, dataDir, loadbalancer.ETCDServerServiceName, etcdURL, 2379) if err != nil { return nil, err }