From a17e336993dce9f0bede53f311c51c5ac31e9b03 Mon Sep 17 00:00:00 2001 From: Erik Wilson Date: Wed, 24 Jul 2019 00:22:31 -0700 Subject: [PATCH 1/2] Use go tcpproxy --- pkg/agent/loadbalancer/config.go | 38 ++++ pkg/agent/loadbalancer/loadbalancer.go | 142 +++++++++++++++ pkg/agent/loadbalancer/loadbalancer_test.go | 183 ++++++++++++++++++++ pkg/agent/loadbalancer/servers.go | 57 ++++++ pkg/agent/loadbalancer/utility.go | 49 ++++++ pkg/agent/run.go | 17 +- pkg/agent/tunnel/tunnel.go | 8 +- pkg/cli/cmds/agent.go | 1 + pkg/cli/server/server.go | 1 + 9 files changed, 491 insertions(+), 5 deletions(-) create mode 100644 pkg/agent/loadbalancer/config.go create mode 100644 pkg/agent/loadbalancer/loadbalancer.go create mode 100644 pkg/agent/loadbalancer/loadbalancer_test.go create mode 100644 pkg/agent/loadbalancer/servers.go create mode 100644 pkg/agent/loadbalancer/utility.go diff --git a/pkg/agent/loadbalancer/config.go b/pkg/agent/loadbalancer/config.go new file mode 100644 index 0000000000..9c3ce95098 --- /dev/null +++ b/pkg/agent/loadbalancer/config.go @@ -0,0 +1,38 @@ +package loadbalancer + +import ( + "encoding/json" + "io/ioutil" + + "github.com/rancher/k3s/pkg/agent/util" +) + +func (lb *LoadBalancer) writeConfig() error { + configOut, err := json.MarshalIndent(lb, "", " ") + if err != nil { + return err + } + if err := util.WriteFile(lb.configFile, string(configOut)); err != nil { + return err + } + return nil +} + +func (lb *LoadBalancer) updateConfig() error { + writeConfig := true + if configBytes, err := ioutil.ReadFile(lb.configFile); err == nil { + config := &LoadBalancer{} + if err := json.Unmarshal(configBytes, config); err == nil { + if config.ServerURL == lb.ServerURL { + writeConfig = false + lb.setServers(config.ServerAddresses) + } + } + } + if writeConfig { + if err := lb.writeConfig(); err != nil { + return err + } + } + return nil +} diff --git a/pkg/agent/loadbalancer/loadbalancer.go b/pkg/agent/loadbalancer/loadbalancer.go new file mode 100644 index 0000000000..b15b6154aa --- /dev/null +++ b/pkg/agent/loadbalancer/loadbalancer.go @@ -0,0 +1,142 @@ +package loadbalancer + +import ( + "context" + "errors" + "net" + "path/filepath" + "sync" + + "github.com/google/tcpproxy" + "github.com/rancher/k3s/pkg/cli/cmds" + "github.com/sirupsen/logrus" +) + +type LoadBalancer struct { + mutex sync.Mutex + dialer *net.Dialer + proxy *tcpproxy.Proxy + + configFile string + localAddress string + localServerURL string + originalServerAddress string + ServerURL string + ServerAddresses []string + randomServers []string + currentServerAddress string + nextServerIndex int +} + +const ( + serviceName = "k3s-agent-load-balancer" +) + +func Setup(ctx context.Context, cfg cmds.Agent) (_lb *LoadBalancer, _err error) { + if cfg.DisableLoadBalancer { + return nil, nil + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + defer func() { + if _err != nil { + logrus.Warnf("Error starting load balancer: %s", _err) + if listener != nil { + listener.Close() + } + } + }() + if err != nil { + return nil, err + } + localAddress := listener.Addr().String() + + originalServerAddress, localServerURL, err := parseURL(cfg.ServerURL, localAddress) + if err != nil { + return nil, err + } + + lb := &LoadBalancer{ + dialer: &net.Dialer{}, + configFile: filepath.Join(cfg.DataDir, "etc", serviceName+".json"), + localAddress: localAddress, + localServerURL: localServerURL, + originalServerAddress: originalServerAddress, + ServerURL: cfg.ServerURL, + } + + lb.setServers([]string{lb.originalServerAddress}) + + lb.proxy = &tcpproxy.Proxy{ + ListenFunc: func(string, string) (net.Listener, error) { + return listener, nil + }, + } + lb.proxy.AddRoute(serviceName, &tcpproxy.DialProxy{ + Addr: serviceName, + DialContext: lb.dialContext, + }) + + if err := lb.updateConfig(); err != nil { + return nil, err + } + if err := lb.proxy.Start(); err != nil { + return nil, err + } + logrus.Infof("Running load balancer %s -> %v", lb.localAddress, lb.randomServers) + + return lb, nil +} + +func (lb *LoadBalancer) Update(serverAddresses []string) { + if lb == nil { + return + } + if !lb.setServers(serverAddresses) { + return + } + logrus.Infof("Updating load balancer server addresses -> %v", lb.randomServers) + + if err := lb.writeConfig(); err != nil { + logrus.Warnf("Error updating load balancer config: %s", err) + } +} + +func (lb *LoadBalancer) LoadBalancerServerURL() string { + if lb == nil { + return "" + } + return lb.localServerURL +} + +func (lb *LoadBalancer) dialContext(ctx context.Context, network, address string) (net.Conn, error) { + startIndex := lb.nextServerIndex + for { + targetServer := lb.currentServerAddress + + conn, err := lb.dialer.DialContext(ctx, network, targetServer) + if err == nil { + return conn, nil + } + logrus.Warnf("Dial error from load balancer: %s", err) + + newServer, err := lb.nextServer(targetServer) + if err != nil { + return nil, err + } + if targetServer != newServer { + logrus.Warnf("Dial context in load balancer failed over to %s", newServer) + } + if ctx.Err() != nil { + return nil, ctx.Err() + } + + maxIndex := len(lb.randomServers) + if startIndex > maxIndex { + startIndex = maxIndex + } + if lb.nextServerIndex == startIndex { + return nil, errors.New("all servers failed") + } + } +} diff --git a/pkg/agent/loadbalancer/loadbalancer_test.go b/pkg/agent/loadbalancer/loadbalancer_test.go new file mode 100644 index 0000000000..41b5cbaa2c --- /dev/null +++ b/pkg/agent/loadbalancer/loadbalancer_test.go @@ -0,0 +1,183 @@ +package loadbalancer + +import ( + "bufio" + "context" + "errors" + "fmt" + "io/ioutil" + "net" + "net/url" + "os" + "strings" + "testing" + "time" + + "github.com/rancher/k3s/pkg/cli/cmds" +) + +type server struct { + listener net.Listener + conns []net.Conn + prefix string +} + +func createServer(prefix string) (*server, error) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, err + } + s := &server{ + prefix: prefix, + listener: listener, + } + go s.serve() + return s, nil +} + +func (s *server) serve() { + for { + conn, err := s.listener.Accept() + if err != nil { + return + } + s.conns = append(s.conns, conn) + go s.echo(conn) + } +} + +func (s *server) close() { + s.listener.Close() + for _, conn := range s.conns { + conn.Close() + } +} + +func (s *server) echo(conn net.Conn) { + for { + result, err := bufio.NewReader(conn).ReadString('\n') + if err != nil { + return + } + conn.Write([]byte(s.prefix + ":" + result)) + } +} + +func ping(conn net.Conn) (string, error) { + fmt.Fprintf(conn, "ping\n") + result, err := bufio.NewReader(conn).ReadString('\n') + if err != nil { + return "", err + } + return strings.TrimSpace(result), nil +} + +func assertEqual(t *testing.T, a interface{}, b interface{}) { + if a != b { + t.Fatalf("[ %v != %v ]", a, b) + } +} + +func assertNotEqual(t *testing.T, a interface{}, b interface{}) { + if a == b { + t.Fatalf("[ %v == %v ]", a, b) + } +} + +func TestFailOver(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "lb-test") + if err != nil { + assertEqual(t, err, nil) + } + defer os.RemoveAll(tmpDir) + + ogServe, err := createServer("og") + if err != nil { + assertEqual(t, err, nil) + } + + lbServe, err := createServer("lb") + if err != nil { + assertEqual(t, err, nil) + } + + cfg := cmds.Agent{ + ServerURL: fmt.Sprintf("http://%s/", ogServe.listener.Addr().String()), + DataDir: tmpDir, + } + + lb, err := Setup(context.Background(), cfg) + if err != nil { + assertEqual(t, err, nil) + } + + parsedURL, err := url.Parse(lb.LoadBalancerServerURL()) + if err != nil { + assertEqual(t, err, nil) + } + localAddress := parsedURL.Host + + lb.Update([]string{lbServe.listener.Addr().String()}) + + conn1, err := net.Dial("tcp", localAddress) + if err != nil { + assertEqual(t, err, nil) + } + result1, err := ping(conn1) + if err != nil { + assertEqual(t, err, nil) + } + assertEqual(t, result1, "lb:ping") + + lbServe.close() + + _, err = ping(conn1) + assertNotEqual(t, err, nil) + + conn2, err := net.Dial("tcp", localAddress) + if err != nil { + assertEqual(t, err, nil) + } + result2, err := ping(conn2) + if err != nil { + assertEqual(t, err, nil) + } + assertEqual(t, result2, "og:ping") +} + +func TestFailFast(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "lb-test") + if err != nil { + assertEqual(t, err, nil) + } + defer os.RemoveAll(tmpDir) + + cfg := cmds.Agent{ + ServerURL: "http://127.0.0.1:-1/", + DataDir: tmpDir, + } + + lb, err := Setup(context.Background(), cfg) + if err != nil { + assertEqual(t, err, nil) + } + + conn, err := net.Dial("tcp", lb.localAddress) + if err != nil { + assertEqual(t, err, nil) + } + + done := make(chan error) + go func() { + _, err = ping(conn) + done <- err + }() + timeout := time.After(10 * time.Millisecond) + + select { + case err := <-done: + assertNotEqual(t, err, nil) + case <-timeout: + t.Fatal(errors.New("time out")) + } +} diff --git a/pkg/agent/loadbalancer/servers.go b/pkg/agent/loadbalancer/servers.go new file mode 100644 index 0000000000..cb0393e733 --- /dev/null +++ b/pkg/agent/loadbalancer/servers.go @@ -0,0 +1,57 @@ +package loadbalancer + +import ( + "errors" + "math/rand" + "reflect" +) + +func (lb *LoadBalancer) setServers(serverAddresses []string) bool { + serverAddresses, hasOriginalServer := sortServers(serverAddresses, lb.originalServerAddress) + if len(serverAddresses) == 0 { + return false + } + + lb.mutex.Lock() + defer lb.mutex.Unlock() + + if reflect.DeepEqual(serverAddresses, lb.ServerAddresses) { + return false + } + + lb.ServerAddresses = serverAddresses + lb.randomServers = append([]string{}, lb.ServerAddresses...) + rand.Shuffle(len(lb.randomServers), func(i, j int) { + lb.randomServers[i], lb.randomServers[j] = lb.randomServers[j], lb.randomServers[i] + }) + if !hasOriginalServer { + lb.randomServers = append(lb.randomServers, lb.originalServerAddress) + } + lb.currentServerAddress = lb.randomServers[0] + lb.nextServerIndex = 1 + + return true +} + +func (lb *LoadBalancer) nextServer(failedServer string) (string, error) { + lb.mutex.Lock() + defer lb.mutex.Unlock() + + if len(lb.randomServers) == 0 { + return "", errors.New("No servers in load balancer proxy list") + } + if len(lb.randomServers) == 1 { + return lb.currentServerAddress, nil + } + if failedServer != lb.currentServerAddress { + return lb.currentServerAddress, nil + } + if lb.nextServerIndex >= len(lb.randomServers) { + lb.nextServerIndex = 0 + } + + lb.currentServerAddress = lb.randomServers[lb.nextServerIndex] + lb.nextServerIndex++ + + return lb.currentServerAddress, nil +} diff --git a/pkg/agent/loadbalancer/utility.go b/pkg/agent/loadbalancer/utility.go new file mode 100644 index 0000000000..a462da2e23 --- /dev/null +++ b/pkg/agent/loadbalancer/utility.go @@ -0,0 +1,49 @@ +package loadbalancer + +import ( + "errors" + "net/url" + "sort" + "strings" +) + +func parseURL(serverURL, newHost string) (string, string, error) { + parsedURL, err := url.Parse(serverURL) + if err != nil { + return "", "", err + } + if parsedURL.Host == "" { + return "", "", errors.New("Initial server URL host is not defined for load balancer") + } + address := parsedURL.Host + if parsedURL.Port() == "" { + if strings.ToLower(parsedURL.Scheme) == "http" { + address += ":80" + } + if strings.ToLower(parsedURL.Scheme) == "https" { + address += ":443" + } + } + parsedURL.Host = newHost + return address, parsedURL.String(), nil +} + +func sortServers(input []string, search string) ([]string, bool) { + result := []string{} + found := false + skip := map[string]bool{"": true} + + for _, entry := range input { + if skip[entry] { + continue + } + if search == entry { + found = true + } + skip[entry] = true + result = append(result, entry) + } + + sort.Strings(result) + return result, found +} diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 9a4521149f..d6f6c6dc29 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -12,6 +12,7 @@ import ( "github.com/rancher/k3s/pkg/agent/config" "github.com/rancher/k3s/pkg/agent/containerd" "github.com/rancher/k3s/pkg/agent/flannel" + "github.com/rancher/k3s/pkg/agent/loadbalancer" "github.com/rancher/k3s/pkg/agent/syssetup" "github.com/rancher/k3s/pkg/agent/tunnel" "github.com/rancher/k3s/pkg/cli/cmds" @@ -21,7 +22,7 @@ import ( "github.com/sirupsen/logrus" ) -func run(ctx context.Context, cfg cmds.Agent) error { +func run(ctx context.Context, cfg cmds.Agent, lb *loadbalancer.LoadBalancer) error { nodeConfig := config.Get(ctx, cfg) if err := config.HostnameCheck(cfg); err != nil { @@ -47,7 +48,7 @@ func run(ctx context.Context, cfg cmds.Agent) error { return err } - if err := tunnel.Setup(ctx, nodeConfig); err != nil { + if err := tunnel.Setup(ctx, nodeConfig, lb.Update); err != nil { return err } @@ -77,11 +78,20 @@ func Run(ctx context.Context, cfg cmds.Agent) error { } cfg.DataDir = filepath.Join(cfg.DataDir, "agent") + os.MkdirAll(cfg.DataDir, 0700) if cfg.ClusterSecret != "" { cfg.Token = "K10node:" + cfg.ClusterSecret } + lb, err := loadbalancer.Setup(ctx, cfg) + if err != nil { + return err + } + if lb != nil { + cfg.ServerURL = lb.LoadBalancerServerURL() + } + for { tmpFile, err := clientaccess.AgentAccessInfoToTempKubeConfig("", cfg.ServerURL, cfg.Token) if err != nil { @@ -97,8 +107,7 @@ func Run(ctx context.Context, cfg cmds.Agent) error { break } - os.MkdirAll(cfg.DataDir, 0700) - return run(ctx, cfg) + return run(ctx, cfg, lb) } func validate() error { diff --git a/pkg/agent/tunnel/tunnel.go b/pkg/agent/tunnel/tunnel.go index be39e21f19..c139129557 100644 --- a/pkg/agent/tunnel/tunnel.go +++ b/pkg/agent/tunnel/tunnel.go @@ -53,7 +53,7 @@ func getAddresses(endpoint *v1.Endpoints) []string { return serverAddresses } -func Setup(ctx context.Context, config *config.Node) error { +func Setup(ctx context.Context, config *config.Node, onChange func([]string)) error { restConfig, err := clientcmd.BuildConfigFromFlags("", config.AgentConfig.KubeConfigNode) if err != nil { return err @@ -74,6 +74,9 @@ func Setup(ctx context.Context, config *config.Node) error { endpoint, _ := client.CoreV1().Endpoints("default").Get("kubernetes", metav1.GetOptions{}) if endpoint != nil { addresses = getAddresses(endpoint) + if onChange != nil { + onChange(addresses) + } } disconnect := map[string]context.CancelFunc{} @@ -120,6 +123,9 @@ func Setup(ctx context.Context, config *config.Node) error { } addresses = newAddresses logrus.Infof("Tunnel endpoint watch event: %v", addresses) + if onChange != nil { + onChange(addresses) + } validEndpoint := map[string]bool{} diff --git a/pkg/cli/cmds/agent.go b/pkg/cli/cmds/agent.go index a82f830774..e1364dda12 100644 --- a/pkg/cli/cmds/agent.go +++ b/pkg/cli/cmds/agent.go @@ -11,6 +11,7 @@ type Agent struct { Token string TokenFile string ServerURL string + DisableLoadBalancer bool ResolvConf string DataDir string NodeIP string diff --git a/pkg/cli/server/server.go b/pkg/cli/server/server.go index ac8c05f9d9..fe4d39b1ca 100644 --- a/pkg/cli/server/server.go +++ b/pkg/cli/server/server.go @@ -214,6 +214,7 @@ func run(app *cli.Context, cfg *cmds.Server) error { agentConfig.ServerURL = url agentConfig.Token = token agentConfig.Labels = append(agentConfig.Labels, "node-role.kubernetes.io/master=true") + agentConfig.DisableLoadBalancer = true return agent.Run(ctx, agentConfig) } From 46894ab74ba85d64c6257ced216f4097229816c7 Mon Sep 17 00:00:00 2001 From: Erik Wilson Date: Thu, 25 Jul 2019 15:32:40 -0700 Subject: [PATCH 2/2] Update vendor --- trash.lock | 2 + vendor.conf | 1 + vendor/github.com/google/tcpproxy/.gitignore | 2 + vendor/github.com/google/tcpproxy/.travis.yml | 49 ++ .../google/tcpproxy/CONTRIBUTING.md | 8 + vendor/github.com/google/tcpproxy/LICENSE | 202 ++++++++ vendor/github.com/google/tcpproxy/README.md | 5 + vendor/github.com/google/tcpproxy/http.go | 125 +++++ vendor/github.com/google/tcpproxy/listener.go | 108 ++++ vendor/github.com/google/tcpproxy/sni.go | 192 +++++++ vendor/github.com/google/tcpproxy/tcpproxy.go | 474 ++++++++++++++++++ 11 files changed, 1168 insertions(+) create mode 100644 vendor/github.com/google/tcpproxy/.gitignore create mode 100644 vendor/github.com/google/tcpproxy/.travis.yml create mode 100644 vendor/github.com/google/tcpproxy/CONTRIBUTING.md create mode 100644 vendor/github.com/google/tcpproxy/LICENSE create mode 100644 vendor/github.com/google/tcpproxy/README.md create mode 100644 vendor/github.com/google/tcpproxy/http.go create mode 100644 vendor/github.com/google/tcpproxy/listener.go create mode 100644 vendor/github.com/google/tcpproxy/sni.go create mode 100644 vendor/github.com/google/tcpproxy/tcpproxy.go diff --git a/trash.lock b/trash.lock index 612a6ea5ff..710ea0bfd7 100755 --- a/trash.lock +++ b/trash.lock @@ -125,6 +125,8 @@ import: version: v1.0.21 - package: github.com/google/gofuzz version: 44d81051d367757e1c7c6a5a86423ece9afcf63c +- package: github.com/google/tcpproxy + version: dfa16c61dad2b18a385dfb351adf71566720535b - package: github.com/googleapis/gnostic version: 0c5108395e2debce0d731cf0287ddf7242066aba - package: github.com/gorilla/mux diff --git a/vendor.conf b/vendor.conf index e30760ab58..bbe340d77b 100644 --- a/vendor.conf +++ b/vendor.conf @@ -31,6 +31,7 @@ gopkg.in/freddierice/go-losetup.v1 fc9adea44124401d8bfef3a97eaf61b5d44cc2c6 github.com/urfave/cli 8e01ec4cd3e2d84ab2fe90d8210528ffbb06d8ff golang.org/x/sync 450f422ab23cf9881c94e2db30cac0eb1b7cf80c github.com/docker/docker c12f09bf99b54f274a5ae241dd154fa74020cbab +github.com/google/tcpproxy dfa16c61dad2b18a385dfb351adf71566720535b # flannel github.com/golang/glog 23def4e6c14b4da8ac2ed8007337bc5eb5007998 diff --git a/vendor/github.com/google/tcpproxy/.gitignore b/vendor/github.com/google/tcpproxy/.gitignore new file mode 100644 index 0000000000..ab78466b90 --- /dev/null +++ b/vendor/github.com/google/tcpproxy/.gitignore @@ -0,0 +1,2 @@ +tlsrouter +tlsrouter.test diff --git a/vendor/github.com/google/tcpproxy/.travis.yml b/vendor/github.com/google/tcpproxy/.travis.yml new file mode 100644 index 0000000000..9c94088fa3 --- /dev/null +++ b/vendor/github.com/google/tcpproxy/.travis.yml @@ -0,0 +1,49 @@ +language: go +go: +- 1.8 +- tip +os: +- linux +install: +- go get github.com/golang/lint/golint +before_script: +script: +- go get -t ./... +- go build ./... +- go test ./... +- go vet ./... +- golint -set_exit_status . + +jobs: + include: + - stage: deploy + go: 1.8 + install: + - gem install fpm + script: + - go build ./cmd/tlsrouter + - fpm -s dir -t deb -n tlsrouter -v $(date '+%Y%m%d%H%M%S') + --license Apache2 + --vendor "David Anderson " + --maintainer "David Anderson " + --description "TLS SNI router" + --url "https://github.com/google/tlsrouter" + ./tlsrouter=/usr/bin/tlsrouter + ./systemd/tlsrouter.service=/lib/systemd/system/tlsrouter.service + deploy: + - provider: packagecloud + repository: tlsrouter + username: danderson + dist: debian/stretch + skip_cleanup: true + on: + branch: master + token: + secure: gNU3o70EU4oYeIS6pr0K5oLMGqqxrcf41EOv6c/YoHPVdV6Cx4j9NW0/ISgu6a1/Xf2NgWKT5BWwLpAuhmGdALuOz1Ah//YBWd9N8mGHGaC6RpOPDU8/9NkQdBEmjEH9sgX4PNOh1KQ7d7O0OH0g8RqJlJa0MkUYbTtN6KJ29oiUXxKmZM4D/iWB8VonKOnrtx1NwQL8jL8imZyEV/1fknhDwumz2iKeU1le4Neq9zkxwICMLUonmgphlrp+SDb1EOoHxT6cn51bqBQtQUplfC4dN4OQU/CPqE9E1N1noibvN29YA93qfcrjD3I95KT9wzq+3B6he33+kb0Gz+Cj5ypGy4P85l7TuX4CtQg0U3NAlJCk32IfsdjK+o47pdmADij9IIb9yKt+g99FMERkJJY5EInqEsxHlW/vNF5OqQCmpiHstZL4R2XaHEsWh6j77npnjjC1Aea8xZTWr8PTsbSzVkbG7bTmFpZoPH8eEmr4GNuw5gnbi6D1AJDjcA+UdY9s5qZNpzuWOqfhOFxL+zUW+8sHBvcoFw3R+pwHECs2LCL1c0xAC1LtNUnmW/gnwHavtvKkzErjR1P8Xl7obCbeChJjp+b/BcFYlNACldZcuzBAPyPwIdlWVyUonL4bm63upfMEEShiAIDDJ21y7fjsQK7CfPA7g25bpyo+hV8= + - provider: script + on: + branch: master + script: go run scripts/prune_old_versions.go -user=danderson -repo=tlsrouter -distro=debian -version=stretch -package=tlsrouter -arch=amd64 -limit=2 + env: + # Packagecloud API key, for prune_old_versions.go + - secure: "SRcNwt+45QyPS1w9aGxMg9905Y6d9w4mBM29G6iTTnUB5nD7cAk4m+tf834knGSobVXlWcRnTDW8zrHdQ9yX22dPqCpH5qE+qzTmIvxRHrVJRMmPeYvligJ/9jYfHgQbvuRT8cUpIcpCQAla6rw8nXfKTOE3h8XqMP2hdc3DTVOu2HCfKCNco1tJ7is+AIAnFV2Wpsbb3ZsdKFvHvi2RKUfFaX61J1GNt2/XJIlZs8jC6Y1IAC+ftjql9UsAE/WjZ9fL0Ww1b9/LBIIGHXWI3HpVv9WvlhhIxIlJgOVjmU2lbSuj2w/EBDJ9cd1Qe+wJkT3yKzE1NRsNScVjGg+Ku5igJu/XXuaHkIX01+15BqgPduBYRL0atiNQDhqgBiSyVhXZBX9vsgsp0bgpKaBSF++CV18Q9dara8aljqqS33M3imO3I8JmXU10944QA9Wvu7pCYuIzXxhINcDXRvqxBqz5LnFJGwnGqngTrOCSVS2xn7Y+sjmhe1n5cPCEISlozfa9mPYPvMPp8zg3TbATOOM8CVfcpaNscLqa/+SExN3zMwSanjNKrBgoaQcBzGW5mIgSPxhXkWikBgapiEN7+2Y032Lhqdb9dYjH+EuwcnofspDjjMabWxnuJaln+E3/9vZi2ooQrBEtvymUTy4VMSnqwIX5bU7nPdIuQycdWhk=" diff --git a/vendor/github.com/google/tcpproxy/CONTRIBUTING.md b/vendor/github.com/google/tcpproxy/CONTRIBUTING.md new file mode 100644 index 0000000000..188ad870fc --- /dev/null +++ b/vendor/github.com/google/tcpproxy/CONTRIBUTING.md @@ -0,0 +1,8 @@ +Contributions are welcome by pull request. + +You need to sign the Google Contributor License Agreement before your +contributions can be accepted. You can find the individual and organization +level CLAs here: + +Individual: https://cla.developers.google.com/about/google-individual +Organization: https://cla.developers.google.com/about/google-corporate diff --git a/vendor/github.com/google/tcpproxy/LICENSE b/vendor/github.com/google/tcpproxy/LICENSE new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/vendor/github.com/google/tcpproxy/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/google/tcpproxy/README.md b/vendor/github.com/google/tcpproxy/README.md new file mode 100644 index 0000000000..374f52c9ee --- /dev/null +++ b/vendor/github.com/google/tcpproxy/README.md @@ -0,0 +1,5 @@ +# tcpproxy + +For library usage, see https://godoc.org/github.com/google/tcpproxy/ + +For CLI usage, see https://github.com/google/tcpproxy/blob/master/cmd/tlsrouter/README.md diff --git a/vendor/github.com/google/tcpproxy/http.go b/vendor/github.com/google/tcpproxy/http.go new file mode 100644 index 0000000000..d28c66fa88 --- /dev/null +++ b/vendor/github.com/google/tcpproxy/http.go @@ -0,0 +1,125 @@ +// Copyright 2017 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tcpproxy + +import ( + "bufio" + "bytes" + "context" + "net/http" +) + +// AddHTTPHostRoute appends a route to the ipPort listener that +// routes to dest if the incoming HTTP/1.x Host header name is +// httpHost. If it doesn't match, rule processing continues for any +// additional routes on ipPort. +// +// The ipPort is any valid net.Listen TCP address. +func (p *Proxy) AddHTTPHostRoute(ipPort, httpHost string, dest Target) { + p.AddHTTPHostMatchRoute(ipPort, equals(httpHost), dest) +} + +// AddHTTPHostMatchRoute appends a route to the ipPort listener that +// routes to dest if the incoming HTTP/1.x Host header name is +// accepted by matcher. If it doesn't match, rule processing continues +// for any additional routes on ipPort. +// +// The ipPort is any valid net.Listen TCP address. +func (p *Proxy) AddHTTPHostMatchRoute(ipPort string, match Matcher, dest Target) { + p.addRoute(ipPort, httpHostMatch{match, dest}) +} + +type httpHostMatch struct { + matcher Matcher + target Target +} + +func (m httpHostMatch) match(br *bufio.Reader) (Target, string) { + hh := httpHostHeader(br) + if m.matcher(context.TODO(), hh) { + return m.target, hh + } + return nil, "" +} + +// httpHostHeader returns the HTTP Host header from br without +// consuming any of its bytes. It returns "" if it can't find one. +func httpHostHeader(br *bufio.Reader) string { + const maxPeek = 4 << 10 + peekSize := 0 + for { + peekSize++ + if peekSize > maxPeek { + b, _ := br.Peek(br.Buffered()) + return httpHostHeaderFromBytes(b) + } + b, err := br.Peek(peekSize) + if n := br.Buffered(); n > peekSize { + b, _ = br.Peek(n) + peekSize = n + } + if len(b) > 0 { + if b[0] < 'A' || b[0] > 'Z' { + // Doesn't look like an HTTP verb + // (GET, POST, etc). + return "" + } + if bytes.Index(b, crlfcrlf) != -1 || bytes.Index(b, lflf) != -1 { + req, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(b))) + if err != nil { + return "" + } + if len(req.Header["Host"]) > 1 { + // TODO(bradfitz): what does + // ReadRequest do if there are + // multiple Host headers? + return "" + } + return req.Host + } + } + if err != nil { + return httpHostHeaderFromBytes(b) + } + } +} + +var ( + lfHostColon = []byte("\nHost:") + lfhostColon = []byte("\nhost:") + crlf = []byte("\r\n") + lf = []byte("\n") + crlfcrlf = []byte("\r\n\r\n") + lflf = []byte("\n\n") +) + +func httpHostHeaderFromBytes(b []byte) string { + if i := bytes.Index(b, lfHostColon); i != -1 { + return string(bytes.TrimSpace(untilEOL(b[i+len(lfHostColon):]))) + } + if i := bytes.Index(b, lfhostColon); i != -1 { + return string(bytes.TrimSpace(untilEOL(b[i+len(lfhostColon):]))) + } + return "" +} + +// untilEOL returns v, truncated before the first '\n' byte, if any. +// The returned slice may include a '\r' at the end. +func untilEOL(v []byte) []byte { + if i := bytes.IndexByte(v, '\n'); i != -1 { + return v[:i] + } + return v +} diff --git a/vendor/github.com/google/tcpproxy/listener.go b/vendor/github.com/google/tcpproxy/listener.go new file mode 100644 index 0000000000..1ddc48ee21 --- /dev/null +++ b/vendor/github.com/google/tcpproxy/listener.go @@ -0,0 +1,108 @@ +// Copyright 2017 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tcpproxy + +import ( + "io" + "net" + "sync" +) + +// TargetListener implements both net.Listener and Target. +// Matched Targets become accepted connections. +type TargetListener struct { + Address string // Address is the string reported by TargetListener.Addr().String(). + + mu sync.Mutex + cond *sync.Cond + closed bool + nextConn net.Conn +} + +var ( + _ net.Listener = (*TargetListener)(nil) + _ Target = (*TargetListener)(nil) +) + +func (tl *TargetListener) lock() { + tl.mu.Lock() + if tl.cond == nil { + tl.cond = sync.NewCond(&tl.mu) + } +} + +type tcpAddr string + +func (a tcpAddr) Network() string { return "tcp" } +func (a tcpAddr) String() string { return string(a) } + +// Addr returns the listener's Address field as a net.Addr. +func (tl *TargetListener) Addr() net.Addr { return tcpAddr(tl.Address) } + +// Close stops listening for new connections. All new connections +// routed to this listener will be closed. Already accepted +// connections are not closed. +func (tl *TargetListener) Close() error { + tl.lock() + if tl.closed { + tl.mu.Unlock() + return nil + } + tl.closed = true + tl.mu.Unlock() + tl.cond.Broadcast() + return nil +} + +// HandleConn implements the Target interface. It blocks until tl is +// closed or another goroutine has called Accept and received c. +func (tl *TargetListener) HandleConn(c net.Conn) { + tl.lock() + defer tl.mu.Unlock() + for tl.nextConn != nil && !tl.closed { + tl.cond.Wait() + } + if tl.closed { + c.Close() + return + } + tl.nextConn = c + tl.cond.Broadcast() // Signal might be sufficient; verify. + for tl.nextConn == c && !tl.closed { + tl.cond.Wait() + } + if tl.closed { + c.Close() + return + } +} + +// Accept implements the Accept method in the net.Listener interface. +func (tl *TargetListener) Accept() (net.Conn, error) { + tl.lock() + for tl.nextConn == nil && !tl.closed { + tl.cond.Wait() + } + if tl.closed { + tl.mu.Unlock() + return nil, io.EOF + } + c := tl.nextConn + tl.nextConn = nil + tl.mu.Unlock() + tl.cond.Broadcast() // Signal might be sufficient; verify. + + return c, nil +} diff --git a/vendor/github.com/google/tcpproxy/sni.go b/vendor/github.com/google/tcpproxy/sni.go new file mode 100644 index 0000000000..53b53c24d7 --- /dev/null +++ b/vendor/github.com/google/tcpproxy/sni.go @@ -0,0 +1,192 @@ +// Copyright 2017 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tcpproxy + +import ( + "bufio" + "bytes" + "context" + "crypto/tls" + "io" + "net" + "strings" +) + +// AddSNIRoute appends a route to the ipPort listener that routes to +// dest if the incoming TLS SNI server name is sni. If it doesn't +// match, rule processing continues for any additional routes on +// ipPort. +// +// By default, the proxy will route all ACME tls-sni-01 challenges +// received on ipPort to all SNI dests. You can disable ACME routing +// with AddStopACMESearch. +// +// The ipPort is any valid net.Listen TCP address. +func (p *Proxy) AddSNIRoute(ipPort, sni string, dest Target) { + p.AddSNIMatchRoute(ipPort, equals(sni), dest) +} + +// AddSNIMatchRoute appends a route to the ipPort listener that routes +// to dest if the incoming TLS SNI server name is accepted by +// matcher. If it doesn't match, rule processing continues for any +// additional routes on ipPort. +// +// By default, the proxy will route all ACME tls-sni-01 challenges +// received on ipPort to all SNI dests. You can disable ACME routing +// with AddStopACMESearch. +// +// The ipPort is any valid net.Listen TCP address. +func (p *Proxy) AddSNIMatchRoute(ipPort string, matcher Matcher, dest Target) { + cfg := p.configFor(ipPort) + if !cfg.stopACME { + if len(cfg.acmeTargets) == 0 { + p.addRoute(ipPort, &acmeMatch{cfg}) + } + cfg.acmeTargets = append(cfg.acmeTargets, dest) + } + + p.addRoute(ipPort, sniMatch{matcher, dest}) +} + +// AddStopACMESearch prevents ACME probing of subsequent SNI routes. +// Any ACME challenges on ipPort for SNI routes previously added +// before this call will still be proxied to all possible SNI +// backends. +func (p *Proxy) AddStopACMESearch(ipPort string) { + p.configFor(ipPort).stopACME = true +} + +type sniMatch struct { + matcher Matcher + target Target +} + +func (m sniMatch) match(br *bufio.Reader) (Target, string) { + sni := clientHelloServerName(br) + if m.matcher(context.TODO(), sni) { + return m.target, sni + } + return nil, "" +} + +// acmeMatch matches "*.acme.invalid" ACME tls-sni-01 challenges and +// searches for a Target in cfg.acmeTargets that has the challenge +// response. +type acmeMatch struct { + cfg *config +} + +func (m *acmeMatch) match(br *bufio.Reader) (Target, string) { + sni := clientHelloServerName(br) + if !strings.HasSuffix(sni, ".acme.invalid") { + return nil, "" + } + + // TODO: cache. ACME issuers will hit multiple times in a short + // burst for each issuance event. A short TTL cache + singleflight + // should have an excellent hit rate. + // TODO: maybe an acme-specific timeout as well? + // TODO: plumb context upwards? + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan Target, len(m.cfg.acmeTargets)) + for _, target := range m.cfg.acmeTargets { + go tryACME(ctx, ch, target, sni) + } + for range m.cfg.acmeTargets { + if target := <-ch; target != nil { + return target, sni + } + } + + // No target was happy with the provided challenge. + return nil, "" +} + +func tryACME(ctx context.Context, ch chan<- Target, dest Target, sni string) { + var ret Target + defer func() { ch <- ret }() + + conn, targetConn := net.Pipe() + defer conn.Close() + go dest.HandleConn(targetConn) + + deadline, ok := ctx.Deadline() + if ok { + conn.SetDeadline(deadline) + } + + client := tls.Client(conn, &tls.Config{ + ServerName: sni, + InsecureSkipVerify: true, + }) + if err := client.Handshake(); err != nil { + // TODO: log? + return + } + certs := client.ConnectionState().PeerCertificates + if len(certs) == 0 { + // TODO: log? + return + } + // acme says the first cert offered by the server must match the + // challenge hostname. + if err := certs[0].VerifyHostname(sni); err != nil { + // TODO: log? + return + } + + // Target presented what looks like a valid challenge + // response, send it back to the matcher. + ret = dest +} + +// clientHelloServerName returns the SNI server name inside the TLS ClientHello, +// without consuming any bytes from br. +// On any error, the empty string is returned. +func clientHelloServerName(br *bufio.Reader) (sni string) { + const recordHeaderLen = 5 + hdr, err := br.Peek(recordHeaderLen) + if err != nil { + return "" + } + const recordTypeHandshake = 0x16 + if hdr[0] != recordTypeHandshake { + return "" // Not TLS. + } + recLen := int(hdr[3])<<8 | int(hdr[4]) // ignoring version in hdr[1:3] + helloBytes, err := br.Peek(recordHeaderLen + recLen) + if err != nil { + return "" + } + tls.Server(sniSniffConn{r: bytes.NewReader(helloBytes)}, &tls.Config{ + GetConfigForClient: func(hello *tls.ClientHelloInfo) (*tls.Config, error) { + sni = hello.ServerName + return nil, nil + }, + }).Handshake() + return +} + +// sniSniffConn is a net.Conn that reads from r, fails on Writes, +// and crashes otherwise. +type sniSniffConn struct { + r io.Reader + net.Conn // nil; crash on any unexpected use +} + +func (c sniSniffConn) Read(p []byte) (int, error) { return c.r.Read(p) } +func (sniSniffConn) Write(p []byte) (int, error) { return 0, io.EOF } diff --git a/vendor/github.com/google/tcpproxy/tcpproxy.go b/vendor/github.com/google/tcpproxy/tcpproxy.go new file mode 100644 index 0000000000..9826d94226 --- /dev/null +++ b/vendor/github.com/google/tcpproxy/tcpproxy.go @@ -0,0 +1,474 @@ +// Copyright 2017 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package tcpproxy lets users build TCP proxies, optionally making +// routing decisions based on HTTP/1 Host headers and the SNI hostname +// in TLS connections. +// +// Typical usage: +// +// var p tcpproxy.Proxy +// p.AddHTTPHostRoute(":80", "foo.com", tcpproxy.To("10.0.0.1:8081")) +// p.AddHTTPHostRoute(":80", "bar.com", tcpproxy.To("10.0.0.2:8082")) +// p.AddRoute(":80", tcpproxy.To("10.0.0.1:8081")) // fallback +// p.AddSNIRoute(":443", "foo.com", tcpproxy.To("10.0.0.1:4431")) +// p.AddSNIRoute(":443", "bar.com", tcpproxy.To("10.0.0.2:4432")) +// p.AddRoute(":443", tcpproxy.To("10.0.0.1:4431")) // fallback +// log.Fatal(p.Run()) +// +// Calling Run (or Start) on a proxy also starts all the necessary +// listeners. +// +// For each accepted connection, the rules for that ipPort are +// matched, in order. If one matches (currently HTTP Host, SNI, or +// always), then the connection is handed to the target. +// +// The two predefined Target implementations are: +// +// 1) DialProxy, proxying to another address (use the To func to return a +// DialProxy value), +// +// 2) TargetListener, making the matched connection available via a +// net.Listener.Accept call. +// +// But Target is an interface, so you can also write your own. +// +// Note that tcpproxy does not do any TLS encryption or decryption. It +// only (via DialProxy) copies bytes around. The SNI hostname in the TLS +// header is unencrypted, for better or worse. +// +// This package makes no API stability promises. If you depend on it, +// vendor it. +package tcpproxy + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "log" + "net" + "time" +) + +// Proxy is a proxy. Its zero value is a valid proxy that does +// nothing. Call methods to add routes before calling Start or Run. +// +// The order that routes are added in matters; each is matched in the order +// registered. +type Proxy struct { + configs map[string]*config // ip:port => config + + lns []net.Listener + donec chan struct{} // closed before err + err error // any error from listening + + // ListenFunc optionally specifies an alternate listen + // function. If nil, net.Dial is used. + // The provided net is always "tcp". + ListenFunc func(net, laddr string) (net.Listener, error) +} + +// Matcher reports whether hostname matches the Matcher's criteria. +type Matcher func(ctx context.Context, hostname string) bool + +// equals is a trivial Matcher that implements string equality. +func equals(want string) Matcher { + return func(_ context.Context, got string) bool { + return want == got + } +} + +// config contains the proxying state for one listener. +type config struct { + routes []route + acmeTargets []Target // accumulates targets that should be probed for acme. + stopACME bool // if true, AddSNIRoute doesn't add targets to acmeTargets. +} + +// A route matches a connection to a target. +type route interface { + // match examines the initial bytes of a connection, looking for a + // match. If a match is found, match returns a non-nil Target to + // which the stream should be proxied. match returns nil if the + // connection doesn't match. + // + // match must not consume bytes from the given bufio.Reader, it + // can only Peek. + // + // If an sni or host header was parsed successfully, that will be + // returned as the second parameter. + match(*bufio.Reader) (Target, string) +} + +func (p *Proxy) netListen() func(net, laddr string) (net.Listener, error) { + if p.ListenFunc != nil { + return p.ListenFunc + } + return net.Listen +} + +func (p *Proxy) configFor(ipPort string) *config { + if p.configs == nil { + p.configs = make(map[string]*config) + } + if p.configs[ipPort] == nil { + p.configs[ipPort] = &config{} + } + return p.configs[ipPort] +} + +func (p *Proxy) addRoute(ipPort string, r route) { + cfg := p.configFor(ipPort) + cfg.routes = append(cfg.routes, r) +} + +// AddRoute appends an always-matching route to the ipPort listener, +// directing any connection to dest. +// +// This is generally used as either the only rule (for simple TCP +// proxies), or as the final fallback rule for an ipPort. +// +// The ipPort is any valid net.Listen TCP address. +func (p *Proxy) AddRoute(ipPort string, dest Target) { + p.addRoute(ipPort, fixedTarget{dest}) +} + +type fixedTarget struct { + t Target +} + +func (m fixedTarget) match(*bufio.Reader) (Target, string) { return m.t, "" } + +// Run is calls Start, and then Wait. +// +// It blocks until there's an error. The return value is always +// non-nil. +func (p *Proxy) Run() error { + if err := p.Start(); err != nil { + return err + } + return p.Wait() +} + +// Wait waits for the Proxy to finish running. Currently this can only +// happen if a Listener is closed, or Close is called on the proxy. +// +// It is only valid to call Wait after a successful call to Start. +func (p *Proxy) Wait() error { + <-p.donec + return p.err +} + +// Close closes all the proxy's self-opened listeners. +func (p *Proxy) Close() error { + for _, c := range p.lns { + c.Close() + } + return nil +} + +// Start creates a TCP listener for each unique ipPort from the +// previously created routes and starts the proxy. It returns any +// error from starting listeners. +// +// If it returns a non-nil error, any successfully opened listeners +// are closed. +func (p *Proxy) Start() error { + if p.donec != nil { + return errors.New("already started") + } + p.donec = make(chan struct{}) + errc := make(chan error, len(p.configs)) + p.lns = make([]net.Listener, 0, len(p.configs)) + for ipPort, config := range p.configs { + ln, err := p.netListen()("tcp", ipPort) + if err != nil { + p.Close() + return err + } + p.lns = append(p.lns, ln) + go p.serveListener(errc, ln, config.routes) + } + go p.awaitFirstError(errc) + return nil +} + +func (p *Proxy) awaitFirstError(errc <-chan error) { + p.err = <-errc + close(p.donec) +} + +func (p *Proxy) serveListener(ret chan<- error, ln net.Listener, routes []route) { + for { + c, err := ln.Accept() + if err != nil { + ret <- err + return + } + go p.serveConn(c, routes) + } +} + +// serveConn runs in its own goroutine and matches c against routes. +// It returns whether it matched purely for testing. +func (p *Proxy) serveConn(c net.Conn, routes []route) bool { + br := bufio.NewReader(c) + for _, route := range routes { + if target, hostName := route.match(br); target != nil { + if n := br.Buffered(); n > 0 { + peeked, _ := br.Peek(br.Buffered()) + c = &Conn{ + HostName: hostName, + Peeked: peeked, + Conn: c, + } + } + target.HandleConn(c) + return true + } + } + // TODO: hook for this? + log.Printf("tcpproxy: no routes matched conn %v/%v; closing", c.RemoteAddr().String(), c.LocalAddr().String()) + c.Close() + return false +} + +// Conn is an incoming connection that has had some bytes read from it +// to determine how to route the connection. The Read method stitches +// the peeked bytes and unread bytes back together. +type Conn struct { + // HostName is the hostname field that was sent to the request router. + // In the case of TLS, this is the SNI header, in the case of HTTPHost + // route, it will be the host header. In the case of a fixed + // route, i.e. those created with AddRoute(), this will always be + // empty. This can be useful in the case where further routing decisions + // need to be made in the Target impementation. + HostName string + + // Peeked are the bytes that have been read from Conn for the + // purposes of route matching, but have not yet been consumed + // by Read calls. It set to nil by Read when fully consumed. + Peeked []byte + + // Conn is the underlying connection. + // It can be type asserted against *net.TCPConn or other types + // as needed. It should not be read from directly unless + // Peeked is nil. + net.Conn +} + +func (c *Conn) Read(p []byte) (n int, err error) { + if len(c.Peeked) > 0 { + n = copy(p, c.Peeked) + c.Peeked = c.Peeked[n:] + if len(c.Peeked) == 0 { + c.Peeked = nil + } + return n, nil + } + return c.Conn.Read(p) +} + +// Target is what an incoming matched connection is sent to. +type Target interface { + // HandleConn is called when an incoming connection is + // matched. After the call to HandleConn, the tcpproxy + // package never touches the conn again. Implementations are + // responsible for closing the connection when needed. + // + // The concrete type of conn will be of type *Conn if any + // bytes have been consumed for the purposes of route + // matching. + HandleConn(net.Conn) +} + +// To is shorthand way of writing &tlsproxy.DialProxy{Addr: addr}. +func To(addr string) *DialProxy { + return &DialProxy{Addr: addr} +} + +// DialProxy implements Target by dialing a new connection to Addr +// and then proxying data back and forth. +// +// The To func is a shorthand way of creating a DialProxy. +type DialProxy struct { + // Addr is the TCP address to proxy to. + Addr string + + // KeepAlivePeriod sets the period between TCP keep alives. + // If zero, a default is used. To disable, use a negative number. + // The keep-alive is used for both the client connection and + KeepAlivePeriod time.Duration + + // DialTimeout optionally specifies a dial timeout. + // If zero, a default is used. + // If negative, the timeout is disabled. + DialTimeout time.Duration + + // DialContext optionally specifies an alternate dial function + // for TCP targets. If nil, the standard + // net.Dialer.DialContext method is used. + DialContext func(ctx context.Context, network, address string) (net.Conn, error) + + // OnDialError optionally specifies an alternate way to handle errors dialing Addr. + // If nil, the error is logged and src is closed. + // If non-nil, src is not closed automatically. + OnDialError func(src net.Conn, dstDialErr error) + + // ProxyProtocolVersion optionally specifies the version of + // HAProxy's PROXY protocol to use. The PROXY protocol provides + // connection metadata to the DialProxy target, via a header + // inserted ahead of the client's traffic. The DialProxy target + // must explicitly support and expect the PROXY header; there is + // no graceful downgrade. + // If zero, no PROXY header is sent. Currently, version 1 is supported. + ProxyProtocolVersion int +} + +// UnderlyingConn returns c.Conn if c of type *Conn, +// otherwise it returns c. +func UnderlyingConn(c net.Conn) net.Conn { + if wrap, ok := c.(*Conn); ok { + return wrap.Conn + } + return c +} + +func goCloseConn(c net.Conn) { go c.Close() } + +// HandleConn implements the Target interface. +func (dp *DialProxy) HandleConn(src net.Conn) { + ctx := context.Background() + var cancel context.CancelFunc + if dp.DialTimeout >= 0 { + ctx, cancel = context.WithTimeout(ctx, dp.dialTimeout()) + } + dst, err := dp.dialContext()(ctx, "tcp", dp.Addr) + if cancel != nil { + cancel() + } + if err != nil { + dp.onDialError()(src, err) + return + } + defer goCloseConn(dst) + + if err = dp.sendProxyHeader(dst, src); err != nil { + dp.onDialError()(src, err) + return + } + defer goCloseConn(src) + + if ka := dp.keepAlivePeriod(); ka > 0 { + if c, ok := UnderlyingConn(src).(*net.TCPConn); ok { + c.SetKeepAlive(true) + c.SetKeepAlivePeriod(ka) + } + if c, ok := dst.(*net.TCPConn); ok { + c.SetKeepAlive(true) + c.SetKeepAlivePeriod(ka) + } + } + + errc := make(chan error, 1) + go proxyCopy(errc, src, dst) + go proxyCopy(errc, dst, src) + <-errc +} + +func (dp *DialProxy) sendProxyHeader(w io.Writer, src net.Conn) error { + switch dp.ProxyProtocolVersion { + case 0: + return nil + case 1: + var srcAddr, dstAddr *net.TCPAddr + if a, ok := src.RemoteAddr().(*net.TCPAddr); ok { + srcAddr = a + } + if a, ok := src.LocalAddr().(*net.TCPAddr); ok { + dstAddr = a + } + + if srcAddr == nil || dstAddr == nil { + _, err := io.WriteString(w, "PROXY UNKNOWN\r\n") + return err + } + + family := "TCP4" + if srcAddr.IP.To4() == nil { + family = "TCP6" + } + _, err := fmt.Fprintf(w, "PROXY %s %s %d %s %d\r\n", family, srcAddr.IP, srcAddr.Port, dstAddr.IP, dstAddr.Port) + return err + default: + return fmt.Errorf("PROXY protocol version %d not supported", dp.ProxyProtocolVersion) + } +} + +// proxyCopy is the function that copies bytes around. +// It's a named function instead of a func literal so users get +// named goroutines in debug goroutine stack dumps. +func proxyCopy(errc chan<- error, dst, src net.Conn) { + // Before we unwrap src and/or dst, copy any buffered data. + if wc, ok := src.(*Conn); ok && len(wc.Peeked) > 0 { + if _, err := dst.Write(wc.Peeked); err != nil { + errc <- err + return + } + wc.Peeked = nil + } + + // Unwrap the src and dst from *Conn to *net.TCPConn so Go + // 1.11's splice optimization kicks in. + src = UnderlyingConn(src) + dst = UnderlyingConn(dst) + + _, err := io.Copy(dst, src) + errc <- err +} + +func (dp *DialProxy) keepAlivePeriod() time.Duration { + if dp.KeepAlivePeriod != 0 { + return dp.KeepAlivePeriod + } + return time.Minute +} + +func (dp *DialProxy) dialTimeout() time.Duration { + if dp.DialTimeout > 0 { + return dp.DialTimeout + } + return 10 * time.Second +} + +var defaultDialer = new(net.Dialer) + +func (dp *DialProxy) dialContext() func(ctx context.Context, network, address string) (net.Conn, error) { + if dp.DialContext != nil { + return dp.DialContext + } + return defaultDialer.DialContext +} + +func (dp *DialProxy) onDialError() func(src net.Conn, dstDialErr error) { + if dp.OnDialError != nil { + return dp.OnDialError + } + return func(src net.Conn, dstDialErr error) { + log.Printf("tcpproxy: for incoming conn %v, error dialing %q: %v", src.RemoteAddr().String(), dp.Addr, dstDialErr) + src.Close() + } +}