Merge pull request #46517 from goblain/cli-port-forward-listen

port-forward listen on address
pull/58/head
k8s-ci-robot 2018-10-09 12:38:55 -07:00 committed by GitHub
commit 830e09cc96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 170 additions and 43 deletions

View File

@ -50,6 +50,7 @@ type PortForwardOptions struct {
RESTClient *restclient.RESTClient
Config *restclient.Config
PodClient corev1client.PodsGetter
Address []string
Ports []string
PortForwarder portForwarder
StopChannel chan struct{}
@ -79,6 +80,12 @@ var (
# Listen on port 8888 locally, forwarding to 5000 in the pod
kubectl port-forward pod/mypod 8888:5000
# Listen on port 8888 on all addresses, forwarding to 5000 in the pod
kubectl port-forward --address 0.0.0.0 pod/mypod 8888:5000
# Listen on port 8888 on localhost and selected IP, forwarding to 5000 in the pod
kubectl port-forward --address localhost,10.19.21.23 pod/mypod 8888:5000
# Listen on a random port locally, forwarding to 5000 in the pod
kubectl port-forward pod/mypod :5000`))
)
@ -95,7 +102,7 @@ func NewCmdPortForward(f cmdutil.Factory, streams genericclioptions.IOStreams) *
},
}
cmd := &cobra.Command{
Use: "port-forward TYPE/NAME [LOCAL_PORT:]REMOTE_PORT [...[LOCAL_PORT_N:]REMOTE_PORT_N]",
Use: "port-forward TYPE/NAME [options] [LOCAL_PORT:]REMOTE_PORT [...[LOCAL_PORT_N:]REMOTE_PORT_N]",
DisableFlagsInUseLine: true,
Short: i18n.T("Forward one or more local ports to a pod"),
Long: portforwardLong,
@ -113,6 +120,7 @@ func NewCmdPortForward(f cmdutil.Factory, streams genericclioptions.IOStreams) *
},
}
cmdutil.AddPodRunningTimeoutFlag(cmd, defaultPodPortForwardWaitTimeout)
cmd.Flags().StringSliceVar(&opts.Address, "address", []string{"localhost"}, "Addresses to listen on (comma separated)")
// TODO support UID
return cmd
}
@ -131,7 +139,7 @@ func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts Po
return err
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
fw, err := portforward.New(dialer, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.Out, f.ErrOut)
fw, err := portforward.NewOnAddresses(dialer, opts.Address, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.Out, f.ErrOut)
if err != nil {
return err
}

View File

@ -39,8 +39,9 @@ const PortForwardProtocolV1Name = "portforward.k8s.io"
// PortForwarder knows how to listen for local connections and forward them to
// a remote pod via an upgraded HTTP request.
type PortForwarder struct {
ports []ForwardedPort
stopChan <-chan struct{}
addresses []listenAddress
ports []ForwardedPort
stopChan <-chan struct{}
dialer httpstream.Dialer
streamConn httpstream.Connection
@ -110,8 +111,52 @@ func parsePorts(ports []string) ([]ForwardedPort, error) {
return forwards, nil
}
// New creates a new PortForwarder.
type listenAddress struct {
address string
protocol string
failureMode string
}
func parseAddresses(addressesToParse []string) ([]listenAddress, error) {
var addresses []listenAddress
parsed := make(map[string]listenAddress)
for _, address := range addressesToParse {
if address == "localhost" {
ip := listenAddress{address: "127.0.0.1", protocol: "tcp4", failureMode: "all"}
parsed[ip.address] = ip
ip = listenAddress{address: "::1", protocol: "tcp6", failureMode: "all"}
parsed[ip.address] = ip
} else if net.ParseIP(address).To4() != nil {
parsed[address] = listenAddress{address: address, protocol: "tcp4", failureMode: "any"}
} else if net.ParseIP(address) != nil {
parsed[address] = listenAddress{address: address, protocol: "tcp6", failureMode: "any"}
} else {
return nil, fmt.Errorf("%s is not a valid IP", address)
}
}
addresses = make([]listenAddress, len(parsed))
id := 0
for _, v := range parsed {
addresses[id] = v
id++
}
return addresses, nil
}
// New creates a new PortForwarder with localhost listen addresses.
func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) {
return NewOnAddresses(dialer, []string{"localhost"}, ports, stopChan, readyChan, out, errOut)
}
// NewOnAddresses creates a new PortForwarder with custom listen addresses.
func NewOnAddresses(dialer httpstream.Dialer, addresses []string, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) {
if len(addresses) == 0 {
return nil, errors.New("You must specify at least 1 address")
}
parsedAddresses, err := parseAddresses(addresses)
if err != nil {
return nil, err
}
if len(ports) == 0 {
return nil, errors.New("You must specify at least 1 port")
}
@ -120,12 +165,13 @@ func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}, rea
return nil, err
}
return &PortForwarder{
dialer: dialer,
ports: parsedPorts,
stopChan: stopChan,
Ready: readyChan,
out: out,
errOut: errOut,
dialer: dialer,
addresses: parsedAddresses,
ports: parsedPorts,
stopChan: stopChan,
Ready: readyChan,
out: out,
errOut: errOut,
}, nil
}
@ -181,13 +227,26 @@ func (pf *PortForwarder) forward() error {
return nil
}
// listenOnPort delegates tcp4 and tcp6 listener creation and waits for connections on both of these addresses.
// If both listener creation fail, an error is raised.
// listenOnPort delegates listener creation and waits for connections on requested bind addresses.
// An error is raised based on address groups (default and localhost) and their failure modes
func (pf *PortForwarder) listenOnPort(port *ForwardedPort) error {
errTcp4 := pf.listenOnPortAndAddress(port, "tcp4", "127.0.0.1")
errTcp6 := pf.listenOnPortAndAddress(port, "tcp6", "::1")
if errTcp4 != nil && errTcp6 != nil {
return fmt.Errorf("All listeners failed to create with the following errors: %s, %s", errTcp4, errTcp6)
var errors []error
failCounters := make(map[string]int, 2)
successCounters := make(map[string]int, 2)
for _, addr := range pf.addresses {
err := pf.listenOnPortAndAddress(port, addr.protocol, addr.address)
if err != nil {
errors = append(errors, err)
failCounters[addr.failureMode]++
} else {
successCounters[addr.failureMode]++
}
}
if successCounters["all"] == 0 && failCounters["all"] > 0 {
return fmt.Errorf("%s: %v", "Listeners failed to create with the following errors", errors)
}
if failCounters["any"] > 0 {
return fmt.Errorf("%s: %v", "Listeners failed to create with the following errors", errors)
}
return nil
}
@ -216,6 +275,7 @@ func (pf *PortForwarder) getListener(protocol string, hostname string, port *For
localPortUInt, err := strconv.ParseUint(localPort, 10, 16)
if err != nil {
fmt.Fprintf(pf.out, "Failed to forward from %s:%d -> %d\n", hostname, localPortUInt, port.Remote)
return nil, fmt.Errorf("Error parsing local port: %s from %s (%s)", err, listenerAddress, host)
}
port.Local = uint16(localPortUInt)

View File

@ -20,6 +20,7 @@ import (
"net"
"os"
"reflect"
"sort"
"strings"
"testing"
@ -40,23 +41,52 @@ func (d *fakeDialer) Dial(protocols ...string) (httpstream.Connection, string, e
func TestParsePortsAndNew(t *testing.T) {
tests := []struct {
input []string
expected []ForwardedPort
expectParseError bool
expectNewError bool
input []string
addresses []string
expectedPorts []ForwardedPort
expectedAddresses []listenAddress
expectPortParseError bool
expectAddressParseError bool
expectNewError bool
}{
{input: []string{}, expectNewError: true},
{input: []string{"a"}, expectParseError: true, expectNewError: true},
{input: []string{":a"}, expectParseError: true, expectNewError: true},
{input: []string{"-1"}, expectParseError: true, expectNewError: true},
{input: []string{"65536"}, expectParseError: true, expectNewError: true},
{input: []string{"0"}, expectParseError: true, expectNewError: true},
{input: []string{"0:0"}, expectParseError: true, expectNewError: true},
{input: []string{"a:5000"}, expectParseError: true, expectNewError: true},
{input: []string{"5000:a"}, expectParseError: true, expectNewError: true},
{input: []string{"a"}, expectPortParseError: true, expectAddressParseError: false, expectNewError: true},
{input: []string{":a"}, expectPortParseError: true, expectAddressParseError: false, expectNewError: true},
{input: []string{"-1"}, expectPortParseError: true, expectAddressParseError: false, expectNewError: true},
{input: []string{"65536"}, expectPortParseError: true, expectAddressParseError: false, expectNewError: true},
{input: []string{"0"}, expectPortParseError: true, expectAddressParseError: false, expectNewError: true},
{input: []string{"0:0"}, expectPortParseError: true, expectAddressParseError: false, expectNewError: true},
{input: []string{"a:5000"}, expectPortParseError: true, expectAddressParseError: false, expectNewError: true},
{input: []string{"5000:a"}, expectPortParseError: true, expectAddressParseError: false, expectNewError: true},
{input: []string{"5000:5000"}, addresses: []string{"127.0.0.257"}, expectPortParseError: false, expectAddressParseError: true, expectNewError: true},
{input: []string{"5000:5000"}, addresses: []string{"::g"}, expectPortParseError: false, expectAddressParseError: true, expectNewError: true},
{input: []string{"5000:5000"}, addresses: []string{"domain.invalid"}, expectPortParseError: false, expectAddressParseError: true, expectNewError: true},
{
input: []string{"5000", "5000:5000", "8888:5000", "5000:8888", ":5000", "0:5000"},
expected: []ForwardedPort{
input: []string{"5000:5000"},
addresses: []string{"localhost"},
expectedPorts: []ForwardedPort{
{5000, 5000},
},
expectedAddresses: []listenAddress{
{protocol: "tcp4", address: "127.0.0.1", failureMode: "all"},
{protocol: "tcp6", address: "::1", failureMode: "all"},
},
},
{
input: []string{"5000:5000"},
addresses: []string{"localhost", "127.0.0.1"},
expectedPorts: []ForwardedPort{
{5000, 5000},
},
expectedAddresses: []listenAddress{
{protocol: "tcp4", address: "127.0.0.1", failureMode: "any"},
{protocol: "tcp6", address: "::1", failureMode: "all"},
},
},
{
input: []string{"5000", "5000:5000", "8888:5000", "5000:8888", ":5000", "0:5000"},
addresses: []string{"127.0.0.1", "::1"},
expectedPorts: []ForwardedPort{
{5000, 5000},
{5000, 5000},
{8888, 5000},
@ -64,34 +94,63 @@ func TestParsePortsAndNew(t *testing.T) {
{0, 5000},
{0, 5000},
},
expectedAddresses: []listenAddress{
{protocol: "tcp4", address: "127.0.0.1", failureMode: "any"},
{protocol: "tcp6", address: "::1", failureMode: "any"},
},
},
}
for i, test := range tests {
parsed, err := parsePorts(test.input)
parsedPorts, err := parsePorts(test.input)
haveError := err != nil
if e, a := test.expectParseError, haveError; e != a {
if e, a := test.expectPortParseError, haveError; e != a {
t.Fatalf("%d: parsePorts: error expected=%t, got %t: %s", i, e, a, err)
}
// default to localhost
if len(test.addresses) == 0 && len(test.expectedAddresses) == 0 {
test.addresses = []string{"localhost"}
test.expectedAddresses = []listenAddress{{protocol: "tcp4", address: "127.0.0.1"}, {protocol: "tcp6", address: "::1"}}
}
// assert address parser
parsedAddresses, err := parseAddresses(test.addresses)
haveError = err != nil
if e, a := test.expectAddressParseError, haveError; e != a {
t.Fatalf("%d: parseAddresses: error expected=%t, got %t: %s", i, e, a, err)
}
dialer := &fakeDialer{}
expectedStopChan := make(chan struct{})
readyChan := make(chan struct{})
pf, err := New(dialer, test.input, expectedStopChan, readyChan, os.Stdout, os.Stderr)
var pf *PortForwarder
if len(test.addresses) > 0 {
pf, err = NewOnAddresses(dialer, test.addresses, test.input, expectedStopChan, readyChan, os.Stdout, os.Stderr)
} else {
pf, err = New(dialer, test.input, expectedStopChan, readyChan, os.Stdout, os.Stderr)
}
haveError = err != nil
if e, a := test.expectNewError, haveError; e != a {
t.Fatalf("%d: New: error expected=%t, got %t: %s", i, e, a, err)
}
if test.expectParseError || test.expectNewError {
if test.expectPortParseError || test.expectAddressParseError || test.expectNewError {
continue
}
for pi, expectedPort := range test.expected {
if e, a := expectedPort.Local, parsed[pi].Local; e != a {
sort.Slice(test.expectedAddresses, func(i, j int) bool { return test.expectedAddresses[i].address < test.expectedAddresses[j].address })
sort.Slice(parsedAddresses, func(i, j int) bool { return parsedAddresses[i].address < parsedAddresses[j].address })
if !reflect.DeepEqual(test.expectedAddresses, parsedAddresses) {
t.Fatalf("%d: expectedAddresses: %v, got: %v", i, test.expectedAddresses, parsedAddresses)
}
for pi, expectedPort := range test.expectedPorts {
if e, a := expectedPort.Local, parsedPorts[pi].Local; e != a {
t.Fatalf("%d: local expected: %d, got: %d", i, e, a)
}
if e, a := expectedPort.Remote, parsed[pi].Remote; e != a {
if e, a := expectedPort.Remote, parsedPorts[pi].Remote; e != a {
t.Fatalf("%d: remote expected: %d, got: %d", i, e, a)
}
}
@ -108,8 +167,8 @@ func TestParsePortsAndNew(t *testing.T) {
if ports, portErr := pf.GetPorts(); portErr != nil {
t.Fatalf("%d: GetPorts: unable to retrieve ports: %s", i, portErr)
} else if !reflect.DeepEqual(test.expected, ports) {
t.Fatalf("%d: ports: expected %#v, got %#v", i, test.expected, ports)
} else if !reflect.DeepEqual(test.expectedPorts, ports) {
t.Fatalf("%d: ports: expected %#v, got %#v", i, test.expectedPorts, ports)
}
if e, a := expectedStopChan, pf.stopChan; e != a {
t.Fatalf("%d: stopChan: expected %#v, got %#v", i, e, a)

View File

@ -49,7 +49,7 @@ const (
// TODO support other ports besides 80
var (
portForwardRegexp = regexp.MustCompile("Forwarding from 127.0.0.1:([0-9]+) -> 80")
portForwardRegexp = regexp.MustCompile("Forwarding from (127.0.0.1|\\[::1\\]):([0-9]+) -> 80")
)
func pfPod(expectedClientData, chunks, chunkSize, chunkIntervalMillis string, bindAddress string) *v1.Pod {
@ -178,11 +178,11 @@ func runPortForward(ns, podName string, port int) *portForwardCommand {
}
portForwardOutput := string(buf[:n])
match := portForwardRegexp.FindStringSubmatch(portForwardOutput)
if len(match) != 2 {
if len(match) != 3 {
framework.Failf("Failed to parse kubectl port-forward output: %s", portForwardOutput)
}
listenPort, err := strconv.Atoi(match[1])
listenPort, err := strconv.Atoi(match[2])
if err != nil {
framework.Failf("Error converting %s to an int: %v", match[1], err)
}