k3s/pkg/client/portforward/portforward.go

301 lines
8.5 KiB
Go
Raw Normal View History

/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package portforward
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"strconv"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy"
"github.com/golang/glog"
)
type upgrader interface {
upgrade(*client.Request, *client.Config) (httpstream.Connection, error)
}
type defaultUpgrader struct{}
func (u *defaultUpgrader) upgrade(req *client.Request, config *client.Config) (httpstream.Connection, error) {
return req.Upgrade(config, spdy.NewRoundTripper)
}
// PortForwarder knows how to listen for local connections and forward them to
// a remote pod via an upgraded HTTP request.
type PortForwarder struct {
req *client.Request
config *client.Config
ports []ForwardedPort
stopChan <-chan struct{}
streamConn httpstream.Connection
listeners []io.Closer
upgrader upgrader
Ready chan struct{}
}
// ForwardedPort contains a Local:Remote port pairing.
type ForwardedPort struct {
Local uint16
Remote uint16
}
/*
valid port specifications:
5000
- forwards from localhost:5000 to pod:5000
8888:5000
- forwards from localhost:8888 to pod:5000
0:5000
:5000
- selects a random available local port,
forwards from localhost:<random port> to pod:5000
*/
func parsePorts(ports []string) ([]ForwardedPort, error) {
var forwards []ForwardedPort
for _, portString := range ports {
parts := strings.Split(portString, ":")
var localString, remoteString string
if len(parts) == 1 {
localString = parts[0]
remoteString = parts[0]
} else if len(parts) == 2 {
localString = parts[0]
if localString == "" {
// support :5000
localString = "0"
}
remoteString = parts[1]
} else {
return nil, fmt.Errorf("Invalid port format '%s'", portString)
}
localPort, err := strconv.ParseUint(localString, 10, 16)
if err != nil {
return nil, fmt.Errorf("Error parsing local port '%s': %s", localString, err)
}
remotePort, err := strconv.ParseUint(remoteString, 10, 16)
if err != nil {
return nil, fmt.Errorf("Error parsing remote port '%s': %s", remoteString, err)
}
if remotePort == 0 {
return nil, fmt.Errorf("Remote port must be > 0")
}
forwards = append(forwards, ForwardedPort{uint16(localPort), uint16(remotePort)})
}
return forwards, nil
}
// New creates a new PortForwarder.
func New(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) (*PortForwarder, error) {
if len(ports) == 0 {
return nil, errors.New("You must specify at least 1 port")
}
parsedPorts, err := parsePorts(ports)
if err != nil {
return nil, err
}
return &PortForwarder{
req: req,
config: config,
ports: parsedPorts,
stopChan: stopChan,
Ready: make(chan struct{}),
}, nil
}
// ForwardPorts formats and executes a port forwarding request. The connection will remain
// open until stopChan is closed.
func (pf *PortForwarder) ForwardPorts() error {
defer pf.Close()
if pf.upgrader == nil {
pf.upgrader = &defaultUpgrader{}
}
var err error
pf.streamConn, err = pf.upgrader.upgrade(pf.req, pf.config)
if err != nil {
return fmt.Errorf("Error upgrading connection: %s", err)
}
defer pf.streamConn.Close()
return pf.forward()
}
// forward dials the remote host specific in req, upgrades the request, starts
// listeners for each port specified in ports, and forwards local connections
// to the remote host via streams.
func (pf *PortForwarder) forward() error {
var err error
listenSuccess := false
for _, port := range pf.ports {
err = pf.listenOnPort(&port)
if err != nil {
glog.Warningf("Unable to listen on port %d: %v", port, err)
}
listenSuccess = true
}
if !listenSuccess {
return fmt.Errorf("Unable to listen on any of the requested ports: %v", pf.ports)
}
close(pf.Ready)
// wait for interrupt or conn closure
select {
case <-pf.stopChan:
case <-pf.streamConn.CloseChan():
glog.Errorf("Lost connection to pod")
}
return nil
}
// listenOnPort creates a new listener on port and waits for new connections
// in the background.
func (pf *PortForwarder) listenOnPort(port *ForwardedPort) error {
listener, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port.Local))
if err != nil {
return err
}
parts := strings.Split(listener.Addr().String(), ":")
localPort, err := strconv.ParseUint(parts[1], 10, 16)
if err != nil {
return fmt.Errorf("Error parsing local part: %s", err)
}
port.Local = uint16(localPort)
glog.Infof("Forwarding from %d -> %d", localPort, port.Remote)
pf.listeners = append(pf.listeners, listener)
go pf.waitForConnection(listener, *port)
return nil
}
// waitForConnection waits for new connections to listener and handles them in
// the background.
func (pf *PortForwarder) waitForConnection(listener net.Listener, port ForwardedPort) {
for {
conn, err := listener.Accept()
if err != nil {
// TODO consider using something like https://github.com/hydrogen18/stoppableListener?
if !strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") {
glog.Errorf("Error accepting connection on port %d: %v", port.Local, err)
}
return
}
go pf.handleConnection(conn, port)
}
}
// handleConnection copies data between the local connection and the stream to
// the remote server.
func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
defer conn.Close()
glog.Infof("Handling connection for %d", port.Local)
errorChan := make(chan error)
doneChan := make(chan struct{}, 2)
// create error stream
headers := http.Header{}
headers.Set(api.StreamType, api.StreamTypeError)
headers.Set(api.PortHeader, fmt.Sprintf("%d", port.Remote))
errorStream, err := pf.streamConn.CreateStream(headers)
if err != nil {
glog.Errorf("Error creating error stream for port %d -> %d: %v", port.Local, port.Remote, err)
return
}
defer errorStream.Reset()
go func() {
message, err := ioutil.ReadAll(errorStream)
if err != nil && err != io.EOF {
errorChan <- fmt.Errorf("Error reading from error stream for port %d -> %d: %v", port.Local, port.Remote, err)
}
if len(message) > 0 {
errorChan <- fmt.Errorf("An error occurred forwarding %d -> %d: %v", port.Local, port.Remote, string(message))
}
}()
// create data stream
headers.Set(api.StreamType, api.StreamTypeData)
dataStream, err := pf.streamConn.CreateStream(headers)
if err != nil {
glog.Errorf("Error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err)
return
}
// Send a Reset when this function exits to completely tear down the stream here
// and in the remote server.
defer dataStream.Reset()
go func() {
// Copy from the remote side to the local port. We won't get an EOF from
// the server as it has no way of knowing when to close the stream. We'll
// take care of closing both ends of the stream with the call to
// stream.Reset() when this function exits.
if _, err := io.Copy(conn, dataStream); err != nil && err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
glog.Errorf("Error copying from remote stream to local connection: %v", err)
}
doneChan <- struct{}{}
}()
go func() {
// Copy from the local port to the remote side. Here we will be able to know
// when the Copy gets an EOF from conn, as that will happen as soon as conn is
// closed (i.e. client disconnected).
if _, err := io.Copy(dataStream, conn); err != nil && err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
glog.Errorf("Error copying from local connection to remote stream: %v", err)
}
doneChan <- struct{}{}
}()
select {
case err := <-errorChan:
glog.Error(err)
case <-doneChan:
}
}
func (pf *PortForwarder) Close() {
// stop all listeners
for _, l := range pf.listeners {
if err := l.Close(); err != nil {
glog.Errorf("Error closing listener: %v", err)
}
}
}