Merge pull request #40738 from deads2k/client-18-portforward

Automatic merge from submit-queue (batch tested with PRs 40527, 40738, 39366, 40609, 40748)

move portforward to client-go

Second to last one.  I had to split some tests like we did for apimachinery, but they still run in kubernetes.

@sttts
pull/6/head
Kubernetes Submit Queue 2017-01-31 15:49:42 -08:00 committed by GitHub
commit adbb975389
13 changed files with 218 additions and 593 deletions

View File

@ -13,6 +13,7 @@ go_test(
srcs = [
"fake_client_test.go",
"listwatch_test.go",
"portfoward_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
@ -23,13 +24,17 @@ go_test(
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/client/unversioned/remotecommand:go_default_library",
"//pkg/kubelet/server/portforward:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/fields",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/pkg/api/install",
"//vendor:k8s.io/client-go/rest",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/tools/portforward",
"//vendor:k8s.io/client-go/util/testing",
],
)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package portforward
package tests
import (
"bytes"
@ -25,186 +25,18 @@ import (
"net/http/httptest"
"net/url"
"os"
"reflect"
"strings"
"sync"
"testing"
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/httpstream"
restclient "k8s.io/client-go/rest"
. "k8s.io/client-go/tools/portforward"
"k8s.io/kubernetes/pkg/client/unversioned/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
)
type fakeDialer struct {
dialed bool
conn httpstream.Connection
err error
negotiatedProtocol string
}
func (d *fakeDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
d.dialed = true
return d.conn, d.negotiatedProtocol, d.err
}
func TestParsePortsAndNew(t *testing.T) {
tests := []struct {
input []string
expected []ForwardedPort
expectParseError bool
expectNewError bool
}{
{input: []string{}, expectNewError: true},
{input: []string{"a"}, expectParseError: true, expectNewError: true},
{input: []string{":a"}, expectParseError: true, expectNewError: true},
{input: []string{"-1"}, expectParseError: true, expectNewError: true},
{input: []string{"65536"}, expectParseError: true, expectNewError: true},
{input: []string{"0"}, expectParseError: true, expectNewError: true},
{input: []string{"0:0"}, expectParseError: true, expectNewError: true},
{input: []string{"a:5000"}, expectParseError: true, expectNewError: true},
{input: []string{"5000:a"}, expectParseError: true, expectNewError: true},
{
input: []string{"5000", "5000:5000", "8888:5000", "5000:8888", ":5000", "0:5000"},
expected: []ForwardedPort{
{5000, 5000},
{5000, 5000},
{8888, 5000},
{5000, 8888},
{0, 5000},
{0, 5000},
},
},
}
for i, test := range tests {
parsed, err := parsePorts(test.input)
haveError := err != nil
if e, a := test.expectParseError, haveError; e != a {
t.Fatalf("%d: parsePorts: error expected=%t, got %t: %s", i, e, a, err)
}
dialer := &fakeDialer{}
expectedStopChan := make(chan struct{})
readyChan := make(chan struct{})
pf, err := New(dialer, test.input, expectedStopChan, readyChan, os.Stdout, os.Stderr)
haveError = err != nil
if e, a := test.expectNewError, haveError; e != a {
t.Fatalf("%d: New: error expected=%t, got %t: %s", i, e, a, err)
}
if test.expectParseError || test.expectNewError {
continue
}
for pi, expectedPort := range test.expected {
if e, a := expectedPort.Local, parsed[pi].Local; e != a {
t.Fatalf("%d: local expected: %d, got: %d", i, e, a)
}
if e, a := expectedPort.Remote, parsed[pi].Remote; e != a {
t.Fatalf("%d: remote expected: %d, got: %d", i, e, a)
}
}
if dialer.dialed {
t.Fatalf("%d: expected not dialed", i)
}
if e, a := test.expected, pf.ports; !reflect.DeepEqual(e, a) {
t.Fatalf("%d: ports: expected %#v, got %#v", i, e, a)
}
if e, a := expectedStopChan, pf.stopChan; e != a {
t.Fatalf("%d: stopChan: expected %#v, got %#v", i, e, a)
}
if pf.Ready == nil {
t.Fatalf("%d: Ready should be non-nil", i)
}
}
}
type GetListenerTestCase struct {
Hostname string
Protocol string
ShouldRaiseError bool
ExpectedListenerAddress string
}
func TestGetListener(t *testing.T) {
var pf PortForwarder
testCases := []GetListenerTestCase{
{
Hostname: "localhost",
Protocol: "tcp4",
ShouldRaiseError: false,
ExpectedListenerAddress: "127.0.0.1",
},
{
Hostname: "127.0.0.1",
Protocol: "tcp4",
ShouldRaiseError: false,
ExpectedListenerAddress: "127.0.0.1",
},
{
Hostname: "[::1]",
Protocol: "tcp6",
ShouldRaiseError: false,
ExpectedListenerAddress: "::1",
},
{
Hostname: "[::1]",
Protocol: "tcp4",
ShouldRaiseError: true,
},
{
Hostname: "127.0.0.1",
Protocol: "tcp6",
ShouldRaiseError: true,
},
{
// IPv6 address must be put into brackets. This test reveals this.
Hostname: "::1",
Protocol: "tcp6",
ShouldRaiseError: true,
},
}
for i, testCase := range testCases {
expectedListenerPort := "12345"
listener, err := pf.getListener(testCase.Protocol, testCase.Hostname, &ForwardedPort{12345, 12345})
if err != nil && strings.Contains(err.Error(), "cannot assign requested address") {
t.Logf("Can't test #%d: %v", i, err)
continue
}
errorRaised := err != nil
if testCase.ShouldRaiseError != errorRaised {
t.Errorf("Test case #%d failed: Data %v an error has been raised(%t) where it should not (or reciprocally): %v", i, testCase, testCase.ShouldRaiseError, err)
continue
}
if errorRaised {
continue
}
if listener == nil {
t.Errorf("Test case #%d did not raise an error but failed in initializing listener", i)
continue
}
host, port, _ := net.SplitHostPort(listener.Addr().String())
t.Logf("Asked a %s forward for: %s:%v, got listener %s:%s, expected: %s", testCase.Protocol, testCase.Hostname, 12345, host, port, expectedListenerPort)
if host != testCase.ExpectedListenerAddress {
t.Errorf("Test case #%d failed: Listener does not listen on exepected address: asked %v got %v", i, testCase.ExpectedListenerAddress, host)
}
if port != expectedListenerPort {
t.Errorf("Test case #%d failed: Listener does not listen on exepected port: asked %v got %v", i, expectedListenerPort, port)
}
listener.Close()
}
}
// fakePortForwarder simulates port forwarding for testing. It implements
// portforward.PortForwarder.
type fakePortForwarder struct {

View File

@ -60,7 +60,6 @@ filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/client/unversioned/portforward:all-srcs",
"//pkg/client/unversioned/remotecommand:all-srcs",
"//pkg/client/unversioned/testclient/simple:all-srcs",
],

View File

@ -1,51 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"portforward.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/kubelet/server/portforward:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/util/httpstream",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
],
)
go_test(
name = "go_default_test",
srcs = ["portforward_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/client/unversioned/remotecommand:go_default_library",
"//pkg/kubelet/server/portforward:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/httpstream",
"//vendor:k8s.io/client-go/rest",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -1,19 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package portforward adds support for SSH-like port forwarding from the client's
// local host to remote containers.
package portforward // import "k8s.io/kubernetes/pkg/client/unversioned/portforward"

View File

@ -1,340 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package portforward
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"strconv"
"strings"
"sync"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
)
// PortForwarder knows how to listen for local connections and forward them to
// a remote pod via an upgraded HTTP request.
type PortForwarder struct {
ports []ForwardedPort
stopChan <-chan struct{}
dialer httpstream.Dialer
streamConn httpstream.Connection
listeners []io.Closer
Ready chan struct{}
requestIDLock sync.Mutex
requestID int
out io.Writer
errOut io.Writer
}
// ForwardedPort contains a Local:Remote port pairing.
type ForwardedPort struct {
Local uint16
Remote uint16
}
/*
valid port specifications:
5000
- forwards from localhost:5000 to pod:5000
8888:5000
- forwards from localhost:8888 to pod:5000
0:5000
:5000
- selects a random available local port,
forwards from localhost:<random port> to pod:5000
*/
func parsePorts(ports []string) ([]ForwardedPort, error) {
var forwards []ForwardedPort
for _, portString := range ports {
parts := strings.Split(portString, ":")
var localString, remoteString string
if len(parts) == 1 {
localString = parts[0]
remoteString = parts[0]
} else if len(parts) == 2 {
localString = parts[0]
if localString == "" {
// support :5000
localString = "0"
}
remoteString = parts[1]
} else {
return nil, fmt.Errorf("Invalid port format '%s'", portString)
}
localPort, err := strconv.ParseUint(localString, 10, 16)
if err != nil {
return nil, fmt.Errorf("Error parsing local port '%s': %s", localString, err)
}
remotePort, err := strconv.ParseUint(remoteString, 10, 16)
if err != nil {
return nil, fmt.Errorf("Error parsing remote port '%s': %s", remoteString, err)
}
if remotePort == 0 {
return nil, fmt.Errorf("Remote port must be > 0")
}
forwards = append(forwards, ForwardedPort{uint16(localPort), uint16(remotePort)})
}
return forwards, nil
}
// New creates a new PortForwarder.
func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) {
if len(ports) == 0 {
return nil, errors.New("You must specify at least 1 port")
}
parsedPorts, err := parsePorts(ports)
if err != nil {
return nil, err
}
return &PortForwarder{
dialer: dialer,
ports: parsedPorts,
stopChan: stopChan,
Ready: readyChan,
out: out,
errOut: errOut,
}, nil
}
// ForwardPorts formats and executes a port forwarding request. The connection will remain
// open until stopChan is closed.
func (pf *PortForwarder) ForwardPorts() error {
defer pf.Close()
var err error
pf.streamConn, _, err = pf.dialer.Dial(portforward.PortForwardProtocolV1Name)
if err != nil {
return fmt.Errorf("error upgrading connection: %s", err)
}
defer pf.streamConn.Close()
return pf.forward()
}
// forward dials the remote host specific in req, upgrades the request, starts
// listeners for each port specified in ports, and forwards local connections
// to the remote host via streams.
func (pf *PortForwarder) forward() error {
var err error
listenSuccess := false
for _, port := range pf.ports {
err = pf.listenOnPort(&port)
switch {
case err == nil:
listenSuccess = true
default:
if pf.errOut != nil {
fmt.Fprintf(pf.errOut, "Unable to listen on port %d: %v\n", port.Local, err)
}
}
}
if !listenSuccess {
return fmt.Errorf("Unable to listen on any of the requested ports: %v", pf.ports)
}
if pf.Ready != nil {
close(pf.Ready)
}
// wait for interrupt or conn closure
select {
case <-pf.stopChan:
case <-pf.streamConn.CloseChan():
runtime.HandleError(errors.New("lost connection to pod"))
}
return nil
}
// listenOnPort delegates tcp4 and tcp6 listener creation and waits for connections on both of these addresses.
// If both listener creation fail, an error is raised.
func (pf *PortForwarder) listenOnPort(port *ForwardedPort) error {
errTcp4 := pf.listenOnPortAndAddress(port, "tcp4", "127.0.0.1")
errTcp6 := pf.listenOnPortAndAddress(port, "tcp6", "[::1]")
if errTcp4 != nil && errTcp6 != nil {
return fmt.Errorf("All listeners failed to create with the following errors: %s, %s", errTcp4, errTcp6)
}
return nil
}
// listenOnPortAndAddress delegates listener creation and waits for new connections
// in the background f
func (pf *PortForwarder) listenOnPortAndAddress(port *ForwardedPort, protocol string, address string) error {
listener, err := pf.getListener(protocol, address, port)
if err != nil {
return err
}
pf.listeners = append(pf.listeners, listener)
go pf.waitForConnection(listener, *port)
return nil
}
// getListener creates a listener on the interface targeted by the given hostname on the given port with
// the given protocol. protocol is in net.Listen style which basically admits values like tcp, tcp4, tcp6
func (pf *PortForwarder) getListener(protocol string, hostname string, port *ForwardedPort) (net.Listener, error) {
listener, err := net.Listen(protocol, fmt.Sprintf("%s:%d", hostname, port.Local))
if err != nil {
runtime.HandleError(fmt.Errorf("Unable to create listener: Error %s", err))
return nil, err
}
listenerAddress := listener.Addr().String()
host, localPort, _ := net.SplitHostPort(listenerAddress)
localPortUInt, err := strconv.ParseUint(localPort, 10, 16)
if err != nil {
return nil, fmt.Errorf("Error parsing local port: %s from %s (%s)", err, listenerAddress, host)
}
port.Local = uint16(localPortUInt)
if pf.out != nil {
fmt.Fprintf(pf.out, "Forwarding from %s:%d -> %d\n", hostname, localPortUInt, port.Remote)
}
return listener, nil
}
// waitForConnection waits for new connections to listener and handles them in
// the background.
func (pf *PortForwarder) waitForConnection(listener net.Listener, port ForwardedPort) {
for {
conn, err := listener.Accept()
if err != nil {
// TODO consider using something like https://github.com/hydrogen18/stoppableListener?
if !strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") {
runtime.HandleError(fmt.Errorf("Error accepting connection on port %d: %v", port.Local, err))
}
return
}
go pf.handleConnection(conn, port)
}
}
func (pf *PortForwarder) nextRequestID() int {
pf.requestIDLock.Lock()
defer pf.requestIDLock.Unlock()
id := pf.requestID
pf.requestID++
return id
}
// handleConnection copies data between the local connection and the stream to
// the remote server.
func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
defer conn.Close()
if pf.out != nil {
fmt.Fprintf(pf.out, "Handling connection for %d\n", port.Local)
}
requestID := pf.nextRequestID()
// create error stream
headers := http.Header{}
headers.Set(api.StreamType, api.StreamTypeError)
headers.Set(api.PortHeader, fmt.Sprintf("%d", port.Remote))
headers.Set(api.PortForwardRequestIDHeader, strconv.Itoa(requestID))
errorStream, err := pf.streamConn.CreateStream(headers)
if err != nil {
runtime.HandleError(fmt.Errorf("error creating error stream for port %d -> %d: %v", port.Local, port.Remote, err))
return
}
// we're not writing to this stream
errorStream.Close()
errorChan := make(chan error)
go func() {
message, err := ioutil.ReadAll(errorStream)
switch {
case err != nil:
errorChan <- fmt.Errorf("error reading from error stream for port %d -> %d: %v", port.Local, port.Remote, err)
case len(message) > 0:
errorChan <- fmt.Errorf("an error occurred forwarding %d -> %d: %v", port.Local, port.Remote, string(message))
}
close(errorChan)
}()
// create data stream
headers.Set(api.StreamType, api.StreamTypeData)
dataStream, err := pf.streamConn.CreateStream(headers)
if err != nil {
runtime.HandleError(fmt.Errorf("error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err))
return
}
localError := make(chan struct{})
remoteDone := make(chan struct{})
go func() {
// Copy from the remote side to the local port.
if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err))
}
// inform the select below that the remote copy is done
close(remoteDone)
}()
go func() {
// inform server we're not sending any more data after copy unblocks
defer dataStream.Close()
// Copy from the local port to the remote side.
if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err))
// break out of the select below without waiting for the other copy to finish
close(localError)
}
}()
// wait for either a local->remote error or for copying from remote->local to finish
select {
case <-remoteDone:
case <-localError:
}
// always expect something on errorChan (it may be nil)
err = <-errorChan
if err != nil {
runtime.HandleError(err)
}
}
func (pf *PortForwarder) Close() {
// stop all listeners
for _, l := range pf.listeners {
if err := l.Close(); err != nil {
runtime.HandleError(fmt.Errorf("error closing listener: %v", err))
}
}
}

View File

@ -73,7 +73,6 @@ go_library(
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/client/unversioned:go_default_library",
"//pkg/client/unversioned/portforward:go_default_library",
"//pkg/client/unversioned/remotecommand:go_default_library",
"//pkg/kubectl:go_default_library",
"//pkg/kubectl/cmd/config:go_default_library",
@ -125,6 +124,7 @@ go_library(
"//vendor:k8s.io/client-go/discovery",
"//vendor:k8s.io/client-go/rest",
"//vendor:k8s.io/client-go/tools/clientcmd",
"//vendor:k8s.io/client-go/tools/portforward",
],
)

View File

@ -27,9 +27,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/kubernetes/pkg/api"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/client/unversioned/portforward"
"k8s.io/kubernetes/pkg/client/unversioned/remotecommand"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"

View File

@ -85,6 +85,7 @@ save "tools/auth"
save "tools/cache"
save "tools/clientcmd"
save "tools/metrics"
save "tools/portforward"
save "transport"
save "third_party"
save "plugin"
@ -107,11 +108,6 @@ find "${MAIN_REPO}/pkg/version" -maxdepth 1 -type f | xargs -I{} cp {} "${CLIENT
mkcp "pkg/client/clientset_generated/${CLIENTSET}" "pkg/client/clientset_generated"
mkcp "/pkg/client/record" "/pkg/client"
mkcp "/pkg/client/unversioned/portforward" "/pkg/client/unversioned"
# remove this test because it imports the internal clientset
rm "${CLIENT_REPO_TEMP}"/pkg/client/unversioned/portforward/portforward_test.go
pushd "${CLIENT_REPO_TEMP}" > /dev/null
echo "generating vendor/"
GO15VENDOREXPERIMENT=1 godep save ./...
@ -189,7 +185,6 @@ function mvfolder {
mvfolder "pkg/client/clientset_generated/${CLIENTSET}" kubernetes
mvfolder pkg/client/record tools/record
mvfolder pkg/client/unversioned/portforward tools/portforward
if [ "$(find "${CLIENT_REPO_TEMP}"/pkg/client -type f -name "*.go")" ]; then
echo "${CLIENT_REPO_TEMP}/pkg/client is expected to be empty"
exit 1

View File

@ -21,8 +21,8 @@ import (
"net/http"
"strings"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/runtime"
)
const HeaderSpdy31 = "SPDY/3.1"

View File

@ -30,9 +30,12 @@ import (
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/kubelet/server/portforward"
)
// TODO move to API machinery and re-unify with kubelet/server/portfoward
// The subprotocol "portforward.k8s.io" is used for port forwarding.
const PortForwardProtocolV1Name = "portforward.k8s.io"
// PortForwarder knows how to listen for local connections and forward them to
// a remote pod via an upgraded HTTP request.
type PortForwarder struct {
@ -132,7 +135,7 @@ func (pf *PortForwarder) ForwardPorts() error {
defer pf.Close()
var err error
pf.streamConn, _, err = pf.dialer.Dial(portforward.PortForwardProtocolV1Name)
pf.streamConn, _, err = pf.dialer.Dial(PortForwardProtocolV1Name)
if err != nil {
return fmt.Errorf("error upgrading connection: %s", err)
}

View File

@ -0,0 +1,194 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package portforward
import (
"net"
"os"
"reflect"
"strings"
"testing"
"k8s.io/apimachinery/pkg/util/httpstream"
)
type fakeDialer struct {
dialed bool
conn httpstream.Connection
err error
negotiatedProtocol string
}
func (d *fakeDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
d.dialed = true
return d.conn, d.negotiatedProtocol, d.err
}
func TestParsePortsAndNew(t *testing.T) {
tests := []struct {
input []string
expected []ForwardedPort
expectParseError bool
expectNewError bool
}{
{input: []string{}, expectNewError: true},
{input: []string{"a"}, expectParseError: true, expectNewError: true},
{input: []string{":a"}, expectParseError: true, expectNewError: true},
{input: []string{"-1"}, expectParseError: true, expectNewError: true},
{input: []string{"65536"}, expectParseError: true, expectNewError: true},
{input: []string{"0"}, expectParseError: true, expectNewError: true},
{input: []string{"0:0"}, expectParseError: true, expectNewError: true},
{input: []string{"a:5000"}, expectParseError: true, expectNewError: true},
{input: []string{"5000:a"}, expectParseError: true, expectNewError: true},
{
input: []string{"5000", "5000:5000", "8888:5000", "5000:8888", ":5000", "0:5000"},
expected: []ForwardedPort{
{5000, 5000},
{5000, 5000},
{8888, 5000},
{5000, 8888},
{0, 5000},
{0, 5000},
},
},
}
for i, test := range tests {
parsed, err := parsePorts(test.input)
haveError := err != nil
if e, a := test.expectParseError, haveError; e != a {
t.Fatalf("%d: parsePorts: error expected=%t, got %t: %s", i, e, a, err)
}
dialer := &fakeDialer{}
expectedStopChan := make(chan struct{})
readyChan := make(chan struct{})
pf, err := New(dialer, test.input, expectedStopChan, readyChan, os.Stdout, os.Stderr)
haveError = err != nil
if e, a := test.expectNewError, haveError; e != a {
t.Fatalf("%d: New: error expected=%t, got %t: %s", i, e, a, err)
}
if test.expectParseError || test.expectNewError {
continue
}
for pi, expectedPort := range test.expected {
if e, a := expectedPort.Local, parsed[pi].Local; e != a {
t.Fatalf("%d: local expected: %d, got: %d", i, e, a)
}
if e, a := expectedPort.Remote, parsed[pi].Remote; e != a {
t.Fatalf("%d: remote expected: %d, got: %d", i, e, a)
}
}
if dialer.dialed {
t.Fatalf("%d: expected not dialed", i)
}
if e, a := test.expected, pf.ports; !reflect.DeepEqual(e, a) {
t.Fatalf("%d: ports: expected %#v, got %#v", i, e, a)
}
if e, a := expectedStopChan, pf.stopChan; e != a {
t.Fatalf("%d: stopChan: expected %#v, got %#v", i, e, a)
}
if pf.Ready == nil {
t.Fatalf("%d: Ready should be non-nil", i)
}
}
}
type GetListenerTestCase struct {
Hostname string
Protocol string
ShouldRaiseError bool
ExpectedListenerAddress string
}
func TestGetListener(t *testing.T) {
var pf PortForwarder
testCases := []GetListenerTestCase{
{
Hostname: "localhost",
Protocol: "tcp4",
ShouldRaiseError: false,
ExpectedListenerAddress: "127.0.0.1",
},
{
Hostname: "127.0.0.1",
Protocol: "tcp4",
ShouldRaiseError: false,
ExpectedListenerAddress: "127.0.0.1",
},
{
Hostname: "[::1]",
Protocol: "tcp6",
ShouldRaiseError: false,
ExpectedListenerAddress: "::1",
},
{
Hostname: "[::1]",
Protocol: "tcp4",
ShouldRaiseError: true,
},
{
Hostname: "127.0.0.1",
Protocol: "tcp6",
ShouldRaiseError: true,
},
{
// IPv6 address must be put into brackets. This test reveals this.
Hostname: "::1",
Protocol: "tcp6",
ShouldRaiseError: true,
},
}
for i, testCase := range testCases {
expectedListenerPort := "12345"
listener, err := pf.getListener(testCase.Protocol, testCase.Hostname, &ForwardedPort{12345, 12345})
if err != nil && strings.Contains(err.Error(), "cannot assign requested address") {
t.Logf("Can't test #%d: %v", i, err)
continue
}
errorRaised := err != nil
if testCase.ShouldRaiseError != errorRaised {
t.Errorf("Test case #%d failed: Data %v an error has been raised(%t) where it should not (or reciprocally): %v", i, testCase, testCase.ShouldRaiseError, err)
continue
}
if errorRaised {
continue
}
if listener == nil {
t.Errorf("Test case #%d did not raise an error but failed in initializing listener", i)
continue
}
host, port, _ := net.SplitHostPort(listener.Addr().String())
t.Logf("Asked a %s forward for: %s:%v, got listener %s:%s, expected: %s", testCase.Protocol, testCase.Hostname, 12345, host, port, expectedListenerPort)
if host != testCase.ExpectedListenerAddress {
t.Errorf("Test case #%d failed: Listener does not listen on exepected address: asked %v got %v", i, testCase.ExpectedListenerAddress, host)
}
if port != expectedListenerPort {
t.Errorf("Test case #%d failed: Listener does not listen on exepected port: asked %v got %v", i, expectedListenerPort, port)
}
listener.Close()
}
}

9
vendor/BUILD vendored
View File

@ -12582,7 +12582,6 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/util/httpstream",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/client-go/pkg/api",
"//vendor:k8s.io/client-go/pkg/kubelet/server/portforward",
],
)
@ -13999,3 +13998,11 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/openapi",
],
)
go_test(
name = "k8s.io/client-go/tools/portforward_test",
srcs = ["k8s.io/client-go/tools/portforward/portforward_test.go"],
library = ":k8s.io/client-go/tools/portforward",
tags = ["automanaged"],
deps = ["//vendor:k8s.io/apimachinery/pkg/util/httpstream"],
)