mirror of https://github.com/k3s-io/k3s
Handle loadbalancer port in TIME_WAIT
If the port wanted by the client load balancer is in TIME_WAIT, startup will fail. Set SO_REUSEPORT so that it can be listened on again immediately. The configurable Listen call wants a context, so plumb that through as well. Signed-off-by: Brad Davidson <brad.davidson@rancher.com>pull/3039/head
parent
7cdfaad6ce
commit
c0d129003b
|
@ -39,7 +39,7 @@ const (
|
|||
|
||||
func Get(ctx context.Context, agent cmds.Agent, proxy proxy.Proxy) *config.Node {
|
||||
for {
|
||||
agentConfig, err := get(&agent, proxy)
|
||||
agentConfig, err := get(ctx, &agent, proxy)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to retrieve agent config: %v", err)
|
||||
select {
|
||||
|
@ -293,7 +293,7 @@ func locateOrGenerateResolvConf(envInfo *cmds.Agent) string {
|
|||
return tmpConf
|
||||
}
|
||||
|
||||
func get(envInfo *cmds.Agent, proxy proxy.Proxy) (*config.Node, error) {
|
||||
func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.Node, error) {
|
||||
if envInfo.Debug {
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
}
|
||||
|
@ -310,7 +310,7 @@ func get(envInfo *cmds.Agent, proxy proxy.Proxy) (*config.Node, error) {
|
|||
|
||||
// If the supervisor and externally-facing apiserver are not on the same port, tell the proxy where to find the apiserver.
|
||||
if controlConfig.SupervisorPort != controlConfig.HTTPSPort {
|
||||
if err := proxy.SetAPIServerPort(controlConfig.HTTPSPort); err != nil {
|
||||
if err := proxy.SetAPIServerPort(ctx, controlConfig.HTTPSPort); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to setup access to API Server port %d on at %s", controlConfig.HTTPSPort, proxy.SupervisorURL())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,10 +7,12 @@ import (
|
|||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"github.com/google/tcpproxy"
|
||||
"github.com/rancher/k3s/pkg/version"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
type LoadBalancer struct {
|
||||
|
@ -38,8 +40,9 @@ var (
|
|||
ETCDServerServiceName = version.Program + "-etcd-server-load-balancer"
|
||||
)
|
||||
|
||||
func New(dataDir, serviceName, serverURL string, lbServerPort int) (_lb *LoadBalancer, _err error) {
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:"+strconv.Itoa(lbServerPort))
|
||||
func New(ctx context.Context, dataDir, serviceName, serverURL string, lbServerPort int) (_lb *LoadBalancer, _err error) {
|
||||
config := net.ListenConfig{Control: reusePort}
|
||||
listener, err := config.Listen(ctx, "tcp", "127.0.0.1:"+strconv.Itoa(lbServerPort))
|
||||
defer func() {
|
||||
if _err != nil {
|
||||
logrus.Warnf("Error starting load balancer: %s", _err)
|
||||
|
@ -153,3 +156,9 @@ func onDialError(src net.Conn, dstDialErr error) {
|
|||
logrus.Debugf("Incoming conn %v, error dialing load balancer servers: %v", src.RemoteAddr().String(), dstDialErr)
|
||||
src.Close()
|
||||
}
|
||||
|
||||
func reusePort(network, address string, conn syscall.RawConn) error {
|
||||
return conn.Control(func(descriptor uintptr) {
|
||||
syscall.SetsockoptInt(int(descriptor), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package loadbalancer
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
|
@ -105,7 +106,7 @@ func TestFailOver(t *testing.T) {
|
|||
DataDir: tmpDir,
|
||||
}
|
||||
|
||||
lb, err := New(cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort)
|
||||
lb, err := New(context.TODO(), cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort)
|
||||
if err != nil {
|
||||
assertEqual(t, err, nil)
|
||||
}
|
||||
|
@ -156,7 +157,7 @@ func TestFailFast(t *testing.T) {
|
|||
DataDir: tmpDir,
|
||||
}
|
||||
|
||||
lb, err := New(cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort)
|
||||
lb, err := New(context.TODO(), cfg.DataDir, SupervisorServiceName, cfg.ServerURL, RandomPort)
|
||||
if err != nil {
|
||||
assertEqual(t, err, nil)
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
sysnet "net"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
@ -13,7 +14,7 @@ import (
|
|||
|
||||
type Proxy interface {
|
||||
Update(addresses []string)
|
||||
SetAPIServerPort(port int) error
|
||||
SetAPIServerPort(ctx context.Context, port int) error
|
||||
SupervisorURL() string
|
||||
SupervisorAddresses() []string
|
||||
APIServerURL() string
|
||||
|
@ -27,7 +28,7 @@ type Proxy interface {
|
|||
// NOTE: This is a proxy in the API sense - it returns either actual server URLs, or the URL of the
|
||||
// local load-balancer. It is not actually responsible for proxying requests at the network level;
|
||||
// this is handled by the load-balancers that the proxy optionally steers connections towards.
|
||||
func NewSupervisorProxy(lbEnabled bool, dataDir, supervisorURL string, lbServerPort int) (Proxy, error) {
|
||||
func NewSupervisorProxy(ctx context.Context, lbEnabled bool, dataDir, supervisorURL string, lbServerPort int) (Proxy, error) {
|
||||
p := proxy{
|
||||
lbEnabled: lbEnabled,
|
||||
dataDir: dataDir,
|
||||
|
@ -38,7 +39,7 @@ func NewSupervisorProxy(lbEnabled bool, dataDir, supervisorURL string, lbServerP
|
|||
}
|
||||
|
||||
if lbEnabled {
|
||||
lb, err := loadbalancer.New(dataDir, loadbalancer.SupervisorServiceName, supervisorURL, p.lbServerPort)
|
||||
lb, err := loadbalancer.New(ctx, dataDir, loadbalancer.SupervisorServiceName, supervisorURL, p.lbServerPort)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -108,7 +109,7 @@ func (p *proxy) setSupervisorPort(addresses []string) []string {
|
|||
// load-balancing is enabled, another load-balancer is started on a port one below the supervisor
|
||||
// load-balancer, and the address of this load-balancer is returned instead of the actual apiserver
|
||||
// addresses.
|
||||
func (p *proxy) SetAPIServerPort(port int) error {
|
||||
func (p *proxy) SetAPIServerPort(ctx context.Context, port int) error {
|
||||
u, err := url.Parse(p.initialSupervisorURL)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to parse server URL %s", p.initialSupervisorURL)
|
||||
|
@ -123,7 +124,7 @@ func (p *proxy) SetAPIServerPort(port int) error {
|
|||
if lbServerPort != 0 {
|
||||
lbServerPort = lbServerPort - 1
|
||||
}
|
||||
lb, err := loadbalancer.New(p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL, lbServerPort)
|
||||
lb, err := loadbalancer.New(ctx, p.dataDir, loadbalancer.APIServerServiceName, p.apiServerURL, lbServerPort)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -148,7 +148,7 @@ func Run(ctx context.Context, cfg cmds.Agent) error {
|
|||
return err
|
||||
}
|
||||
|
||||
proxy, err := proxy.NewSupervisorProxy(!cfg.DisableLoadBalancer, agentDir, cfg.ServerURL, cfg.LBServerPort)
|
||||
proxy, err := proxy.NewSupervisorProxy(ctx, !cfg.DisableLoadBalancer, agentDir, cfg.ServerURL, cfg.LBServerPort)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
|
|||
clientURL.Host = clientURL.Hostname() + ":2379"
|
||||
clientURLs = append(clientURLs, clientURL.String())
|
||||
}
|
||||
etcdProxy, err := etcd.NewETCDProxy(true, c.config.DataDir, clientURLs[0])
|
||||
etcdProxy, err := etcd.NewETCDProxy(ctx, true, c.config.DataDir, clientURLs[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/url"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
@ -16,7 +17,7 @@ type Proxy interface {
|
|||
|
||||
// NewETCDProxy initializes a new proxy structure that contain a load balancer
|
||||
// which listens on port 2379 and proxy between etcd cluster members
|
||||
func NewETCDProxy(enabled bool, dataDir, etcdURL string) (Proxy, error) {
|
||||
func NewETCDProxy(ctx context.Context, enabled bool, dataDir, etcdURL string) (Proxy, error) {
|
||||
u, err := url.Parse(etcdURL)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to parse etcd client URL")
|
||||
|
@ -29,7 +30,7 @@ func NewETCDProxy(enabled bool, dataDir, etcdURL string) (Proxy, error) {
|
|||
}
|
||||
|
||||
if enabled {
|
||||
lb, err := loadbalancer.New(dataDir, loadbalancer.ETCDServerServiceName, etcdURL, 2379)
|
||||
lb, err := loadbalancer.New(ctx, dataDir, loadbalancer.ETCDServerServiceName, etcdURL, 2379)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue