From 700c30f3c7edadb373e5f8afbedfa5f34ceda82d Mon Sep 17 00:00:00 2001 From: deads2k Date: Fri, 27 Jan 2017 10:28:10 -0500 Subject: [PATCH] move portforward to client-go --- pkg/client/tests/BUILD | 5 + .../portfoward_test.go} | 172 +-------- pkg/client/unversioned/BUILD | 1 - pkg/client/unversioned/portforward/BUILD | 51 --- pkg/client/unversioned/portforward/doc.go | 19 - .../unversioned/portforward/portforward.go | 340 ------------------ pkg/kubectl/cmd/BUILD | 2 +- pkg/kubectl/cmd/portforward.go | 2 +- staging/copy.sh | 7 +- .../pkg/util/httpstream/spdy/upgrade.go | 2 +- .../tools/portforward/portforward.go | 7 +- .../tools/portforward/portforward_test.go | 194 ++++++++++ vendor/BUILD | 9 +- 13 files changed, 218 insertions(+), 593 deletions(-) rename pkg/client/{unversioned/portforward/portforward_test.go => tests/portfoward_test.go} (54%) delete mode 100644 pkg/client/unversioned/portforward/BUILD delete mode 100644 pkg/client/unversioned/portforward/doc.go delete mode 100644 pkg/client/unversioned/portforward/portforward.go create mode 100644 staging/src/k8s.io/client-go/tools/portforward/portforward_test.go diff --git a/pkg/client/tests/BUILD b/pkg/client/tests/BUILD index ef68fcb378..d31deb3f56 100644 --- a/pkg/client/tests/BUILD +++ b/pkg/client/tests/BUILD @@ -13,6 +13,7 @@ go_test( srcs = [ "fake_client_test.go", "listwatch_test.go", + "portfoward_test.go", ], library = ":go_default_library", tags = ["automanaged"], @@ -23,13 +24,17 @@ go_test( "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", + "//pkg/client/unversioned/remotecommand:go_default_library", + "//pkg/kubelet/server/portforward:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/fields", "//vendor:k8s.io/apimachinery/pkg/runtime", + "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/pkg/api/install", "//vendor:k8s.io/client-go/rest", "//vendor:k8s.io/client-go/tools/cache", + "//vendor:k8s.io/client-go/tools/portforward", "//vendor:k8s.io/client-go/util/testing", ], ) diff --git a/pkg/client/unversioned/portforward/portforward_test.go b/pkg/client/tests/portfoward_test.go similarity index 54% rename from pkg/client/unversioned/portforward/portforward_test.go rename to pkg/client/tests/portfoward_test.go index e174d0de32..c2c1cb083a 100644 --- a/pkg/client/unversioned/portforward/portforward_test.go +++ b/pkg/client/tests/portfoward_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package portforward +package tests import ( "bytes" @@ -25,186 +25,18 @@ import ( "net/http/httptest" "net/url" "os" - "reflect" "strings" "sync" "testing" "time" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/httpstream" restclient "k8s.io/client-go/rest" + . "k8s.io/client-go/tools/portforward" "k8s.io/kubernetes/pkg/client/unversioned/remotecommand" "k8s.io/kubernetes/pkg/kubelet/server/portforward" ) -type fakeDialer struct { - dialed bool - conn httpstream.Connection - err error - negotiatedProtocol string -} - -func (d *fakeDialer) Dial(protocols ...string) (httpstream.Connection, string, error) { - d.dialed = true - return d.conn, d.negotiatedProtocol, d.err -} - -func TestParsePortsAndNew(t *testing.T) { - tests := []struct { - input []string - expected []ForwardedPort - expectParseError bool - expectNewError bool - }{ - {input: []string{}, expectNewError: true}, - {input: []string{"a"}, expectParseError: true, expectNewError: true}, - {input: []string{":a"}, expectParseError: true, expectNewError: true}, - {input: []string{"-1"}, expectParseError: true, expectNewError: true}, - {input: []string{"65536"}, expectParseError: true, expectNewError: true}, - {input: []string{"0"}, expectParseError: true, expectNewError: true}, - {input: []string{"0:0"}, expectParseError: true, expectNewError: true}, - {input: []string{"a:5000"}, expectParseError: true, expectNewError: true}, - {input: []string{"5000:a"}, expectParseError: true, expectNewError: true}, - { - input: []string{"5000", "5000:5000", "8888:5000", "5000:8888", ":5000", "0:5000"}, - expected: []ForwardedPort{ - {5000, 5000}, - {5000, 5000}, - {8888, 5000}, - {5000, 8888}, - {0, 5000}, - {0, 5000}, - }, - }, - } - - for i, test := range tests { - parsed, err := parsePorts(test.input) - haveError := err != nil - if e, a := test.expectParseError, haveError; e != a { - t.Fatalf("%d: parsePorts: error expected=%t, got %t: %s", i, e, a, err) - } - - dialer := &fakeDialer{} - expectedStopChan := make(chan struct{}) - readyChan := make(chan struct{}) - pf, err := New(dialer, test.input, expectedStopChan, readyChan, os.Stdout, os.Stderr) - haveError = err != nil - if e, a := test.expectNewError, haveError; e != a { - t.Fatalf("%d: New: error expected=%t, got %t: %s", i, e, a, err) - } - - if test.expectParseError || test.expectNewError { - continue - } - - for pi, expectedPort := range test.expected { - if e, a := expectedPort.Local, parsed[pi].Local; e != a { - t.Fatalf("%d: local expected: %d, got: %d", i, e, a) - } - if e, a := expectedPort.Remote, parsed[pi].Remote; e != a { - t.Fatalf("%d: remote expected: %d, got: %d", i, e, a) - } - } - - if dialer.dialed { - t.Fatalf("%d: expected not dialed", i) - } - if e, a := test.expected, pf.ports; !reflect.DeepEqual(e, a) { - t.Fatalf("%d: ports: expected %#v, got %#v", i, e, a) - } - if e, a := expectedStopChan, pf.stopChan; e != a { - t.Fatalf("%d: stopChan: expected %#v, got %#v", i, e, a) - } - if pf.Ready == nil { - t.Fatalf("%d: Ready should be non-nil", i) - } - } -} - -type GetListenerTestCase struct { - Hostname string - Protocol string - ShouldRaiseError bool - ExpectedListenerAddress string -} - -func TestGetListener(t *testing.T) { - var pf PortForwarder - testCases := []GetListenerTestCase{ - { - Hostname: "localhost", - Protocol: "tcp4", - ShouldRaiseError: false, - ExpectedListenerAddress: "127.0.0.1", - }, - { - Hostname: "127.0.0.1", - Protocol: "tcp4", - ShouldRaiseError: false, - ExpectedListenerAddress: "127.0.0.1", - }, - { - Hostname: "[::1]", - Protocol: "tcp6", - ShouldRaiseError: false, - ExpectedListenerAddress: "::1", - }, - { - Hostname: "[::1]", - Protocol: "tcp4", - ShouldRaiseError: true, - }, - { - Hostname: "127.0.0.1", - Protocol: "tcp6", - ShouldRaiseError: true, - }, - { - // IPv6 address must be put into brackets. This test reveals this. - Hostname: "::1", - Protocol: "tcp6", - ShouldRaiseError: true, - }, - } - - for i, testCase := range testCases { - expectedListenerPort := "12345" - listener, err := pf.getListener(testCase.Protocol, testCase.Hostname, &ForwardedPort{12345, 12345}) - if err != nil && strings.Contains(err.Error(), "cannot assign requested address") { - t.Logf("Can't test #%d: %v", i, err) - continue - } - errorRaised := err != nil - - if testCase.ShouldRaiseError != errorRaised { - t.Errorf("Test case #%d failed: Data %v an error has been raised(%t) where it should not (or reciprocally): %v", i, testCase, testCase.ShouldRaiseError, err) - continue - } - if errorRaised { - continue - } - - if listener == nil { - t.Errorf("Test case #%d did not raise an error but failed in initializing listener", i) - continue - } - - host, port, _ := net.SplitHostPort(listener.Addr().String()) - t.Logf("Asked a %s forward for: %s:%v, got listener %s:%s, expected: %s", testCase.Protocol, testCase.Hostname, 12345, host, port, expectedListenerPort) - if host != testCase.ExpectedListenerAddress { - t.Errorf("Test case #%d failed: Listener does not listen on exepected address: asked %v got %v", i, testCase.ExpectedListenerAddress, host) - } - if port != expectedListenerPort { - t.Errorf("Test case #%d failed: Listener does not listen on exepected port: asked %v got %v", i, expectedListenerPort, port) - - } - listener.Close() - - } -} - // fakePortForwarder simulates port forwarding for testing. It implements // portforward.PortForwarder. type fakePortForwarder struct { diff --git a/pkg/client/unversioned/BUILD b/pkg/client/unversioned/BUILD index de8a9ebcaa..7eab27bc84 100644 --- a/pkg/client/unversioned/BUILD +++ b/pkg/client/unversioned/BUILD @@ -60,7 +60,6 @@ filegroup( name = "all-srcs", srcs = [ ":package-srcs", - "//pkg/client/unversioned/portforward:all-srcs", "//pkg/client/unversioned/remotecommand:all-srcs", "//pkg/client/unversioned/testclient/simple:all-srcs", ], diff --git a/pkg/client/unversioned/portforward/BUILD b/pkg/client/unversioned/portforward/BUILD deleted file mode 100644 index ce7444cc56..0000000000 --- a/pkg/client/unversioned/portforward/BUILD +++ /dev/null @@ -1,51 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -licenses(["notice"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) - -go_library( - name = "go_default_library", - srcs = [ - "doc.go", - "portforward.go", - ], - tags = ["automanaged"], - deps = [ - "//pkg/api:go_default_library", - "//pkg/kubelet/server/portforward:go_default_library", - "//vendor:k8s.io/apimachinery/pkg/util/httpstream", - "//vendor:k8s.io/apimachinery/pkg/util/runtime", - ], -) - -go_test( - name = "go_default_test", - srcs = ["portforward_test.go"], - library = ":go_default_library", - tags = ["automanaged"], - deps = [ - "//pkg/client/unversioned/remotecommand:go_default_library", - "//pkg/kubelet/server/portforward:go_default_library", - "//vendor:k8s.io/apimachinery/pkg/types", - "//vendor:k8s.io/apimachinery/pkg/util/httpstream", - "//vendor:k8s.io/client-go/rest", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], -) diff --git a/pkg/client/unversioned/portforward/doc.go b/pkg/client/unversioned/portforward/doc.go deleted file mode 100644 index c05f82b0f2..0000000000 --- a/pkg/client/unversioned/portforward/doc.go +++ /dev/null @@ -1,19 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package portforward adds support for SSH-like port forwarding from the client's -// local host to remote containers. -package portforward // import "k8s.io/kubernetes/pkg/client/unversioned/portforward" diff --git a/pkg/client/unversioned/portforward/portforward.go b/pkg/client/unversioned/portforward/portforward.go deleted file mode 100644 index 50def7df29..0000000000 --- a/pkg/client/unversioned/portforward/portforward.go +++ /dev/null @@ -1,340 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package portforward - -import ( - "errors" - "fmt" - "io" - "io/ioutil" - "net" - "net/http" - "strconv" - "strings" - "sync" - - "k8s.io/apimachinery/pkg/util/httpstream" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/kubelet/server/portforward" -) - -// PortForwarder knows how to listen for local connections and forward them to -// a remote pod via an upgraded HTTP request. -type PortForwarder struct { - ports []ForwardedPort - stopChan <-chan struct{} - - dialer httpstream.Dialer - streamConn httpstream.Connection - listeners []io.Closer - Ready chan struct{} - requestIDLock sync.Mutex - requestID int - out io.Writer - errOut io.Writer -} - -// ForwardedPort contains a Local:Remote port pairing. -type ForwardedPort struct { - Local uint16 - Remote uint16 -} - -/* - valid port specifications: - - 5000 - - forwards from localhost:5000 to pod:5000 - - 8888:5000 - - forwards from localhost:8888 to pod:5000 - - 0:5000 - :5000 - - selects a random available local port, - forwards from localhost: to pod:5000 -*/ -func parsePorts(ports []string) ([]ForwardedPort, error) { - var forwards []ForwardedPort - for _, portString := range ports { - parts := strings.Split(portString, ":") - var localString, remoteString string - if len(parts) == 1 { - localString = parts[0] - remoteString = parts[0] - } else if len(parts) == 2 { - localString = parts[0] - if localString == "" { - // support :5000 - localString = "0" - } - remoteString = parts[1] - } else { - return nil, fmt.Errorf("Invalid port format '%s'", portString) - } - - localPort, err := strconv.ParseUint(localString, 10, 16) - if err != nil { - return nil, fmt.Errorf("Error parsing local port '%s': %s", localString, err) - } - - remotePort, err := strconv.ParseUint(remoteString, 10, 16) - if err != nil { - return nil, fmt.Errorf("Error parsing remote port '%s': %s", remoteString, err) - } - if remotePort == 0 { - return nil, fmt.Errorf("Remote port must be > 0") - } - - forwards = append(forwards, ForwardedPort{uint16(localPort), uint16(remotePort)}) - } - - return forwards, nil -} - -// New creates a new PortForwarder. -func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) { - if len(ports) == 0 { - return nil, errors.New("You must specify at least 1 port") - } - parsedPorts, err := parsePorts(ports) - if err != nil { - return nil, err - } - return &PortForwarder{ - dialer: dialer, - ports: parsedPorts, - stopChan: stopChan, - Ready: readyChan, - out: out, - errOut: errOut, - }, nil -} - -// ForwardPorts formats and executes a port forwarding request. The connection will remain -// open until stopChan is closed. -func (pf *PortForwarder) ForwardPorts() error { - defer pf.Close() - - var err error - pf.streamConn, _, err = pf.dialer.Dial(portforward.PortForwardProtocolV1Name) - if err != nil { - return fmt.Errorf("error upgrading connection: %s", err) - } - defer pf.streamConn.Close() - - return pf.forward() -} - -// forward dials the remote host specific in req, upgrades the request, starts -// listeners for each port specified in ports, and forwards local connections -// to the remote host via streams. -func (pf *PortForwarder) forward() error { - var err error - - listenSuccess := false - for _, port := range pf.ports { - err = pf.listenOnPort(&port) - switch { - case err == nil: - listenSuccess = true - default: - if pf.errOut != nil { - fmt.Fprintf(pf.errOut, "Unable to listen on port %d: %v\n", port.Local, err) - } - } - } - - if !listenSuccess { - return fmt.Errorf("Unable to listen on any of the requested ports: %v", pf.ports) - } - - if pf.Ready != nil { - close(pf.Ready) - } - - // wait for interrupt or conn closure - select { - case <-pf.stopChan: - case <-pf.streamConn.CloseChan(): - runtime.HandleError(errors.New("lost connection to pod")) - } - - return nil -} - -// listenOnPort delegates tcp4 and tcp6 listener creation and waits for connections on both of these addresses. -// If both listener creation fail, an error is raised. -func (pf *PortForwarder) listenOnPort(port *ForwardedPort) error { - errTcp4 := pf.listenOnPortAndAddress(port, "tcp4", "127.0.0.1") - errTcp6 := pf.listenOnPortAndAddress(port, "tcp6", "[::1]") - if errTcp4 != nil && errTcp6 != nil { - return fmt.Errorf("All listeners failed to create with the following errors: %s, %s", errTcp4, errTcp6) - } - return nil -} - -// listenOnPortAndAddress delegates listener creation and waits for new connections -// in the background f -func (pf *PortForwarder) listenOnPortAndAddress(port *ForwardedPort, protocol string, address string) error { - listener, err := pf.getListener(protocol, address, port) - if err != nil { - return err - } - pf.listeners = append(pf.listeners, listener) - go pf.waitForConnection(listener, *port) - return nil -} - -// getListener creates a listener on the interface targeted by the given hostname on the given port with -// the given protocol. protocol is in net.Listen style which basically admits values like tcp, tcp4, tcp6 -func (pf *PortForwarder) getListener(protocol string, hostname string, port *ForwardedPort) (net.Listener, error) { - listener, err := net.Listen(protocol, fmt.Sprintf("%s:%d", hostname, port.Local)) - if err != nil { - runtime.HandleError(fmt.Errorf("Unable to create listener: Error %s", err)) - return nil, err - } - listenerAddress := listener.Addr().String() - host, localPort, _ := net.SplitHostPort(listenerAddress) - localPortUInt, err := strconv.ParseUint(localPort, 10, 16) - - if err != nil { - return nil, fmt.Errorf("Error parsing local port: %s from %s (%s)", err, listenerAddress, host) - } - port.Local = uint16(localPortUInt) - if pf.out != nil { - fmt.Fprintf(pf.out, "Forwarding from %s:%d -> %d\n", hostname, localPortUInt, port.Remote) - } - - return listener, nil -} - -// waitForConnection waits for new connections to listener and handles them in -// the background. -func (pf *PortForwarder) waitForConnection(listener net.Listener, port ForwardedPort) { - for { - conn, err := listener.Accept() - if err != nil { - // TODO consider using something like https://github.com/hydrogen18/stoppableListener? - if !strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") { - runtime.HandleError(fmt.Errorf("Error accepting connection on port %d: %v", port.Local, err)) - } - return - } - go pf.handleConnection(conn, port) - } -} - -func (pf *PortForwarder) nextRequestID() int { - pf.requestIDLock.Lock() - defer pf.requestIDLock.Unlock() - id := pf.requestID - pf.requestID++ - return id -} - -// handleConnection copies data between the local connection and the stream to -// the remote server. -func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) { - defer conn.Close() - - if pf.out != nil { - fmt.Fprintf(pf.out, "Handling connection for %d\n", port.Local) - } - - requestID := pf.nextRequestID() - - // create error stream - headers := http.Header{} - headers.Set(api.StreamType, api.StreamTypeError) - headers.Set(api.PortHeader, fmt.Sprintf("%d", port.Remote)) - headers.Set(api.PortForwardRequestIDHeader, strconv.Itoa(requestID)) - errorStream, err := pf.streamConn.CreateStream(headers) - if err != nil { - runtime.HandleError(fmt.Errorf("error creating error stream for port %d -> %d: %v", port.Local, port.Remote, err)) - return - } - // we're not writing to this stream - errorStream.Close() - - errorChan := make(chan error) - go func() { - message, err := ioutil.ReadAll(errorStream) - switch { - case err != nil: - errorChan <- fmt.Errorf("error reading from error stream for port %d -> %d: %v", port.Local, port.Remote, err) - case len(message) > 0: - errorChan <- fmt.Errorf("an error occurred forwarding %d -> %d: %v", port.Local, port.Remote, string(message)) - } - close(errorChan) - }() - - // create data stream - headers.Set(api.StreamType, api.StreamTypeData) - dataStream, err := pf.streamConn.CreateStream(headers) - if err != nil { - runtime.HandleError(fmt.Errorf("error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err)) - return - } - - localError := make(chan struct{}) - remoteDone := make(chan struct{}) - - go func() { - // Copy from the remote side to the local port. - if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { - runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err)) - } - - // inform the select below that the remote copy is done - close(remoteDone) - }() - - go func() { - // inform server we're not sending any more data after copy unblocks - defer dataStream.Close() - - // Copy from the local port to the remote side. - if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { - runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err)) - // break out of the select below without waiting for the other copy to finish - close(localError) - } - }() - - // wait for either a local->remote error or for copying from remote->local to finish - select { - case <-remoteDone: - case <-localError: - } - - // always expect something on errorChan (it may be nil) - err = <-errorChan - if err != nil { - runtime.HandleError(err) - } -} - -func (pf *PortForwarder) Close() { - // stop all listeners - for _, l := range pf.listeners { - if err := l.Close(); err != nil { - runtime.HandleError(fmt.Errorf("error closing listener: %v", err)) - } - } -} diff --git a/pkg/kubectl/cmd/BUILD b/pkg/kubectl/cmd/BUILD index 6597deb9f9..6378a5859a 100644 --- a/pkg/kubectl/cmd/BUILD +++ b/pkg/kubectl/cmd/BUILD @@ -73,7 +73,6 @@ go_library( "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", "//pkg/client/unversioned:go_default_library", - "//pkg/client/unversioned/portforward:go_default_library", "//pkg/client/unversioned/remotecommand:go_default_library", "//pkg/kubectl:go_default_library", "//pkg/kubectl/cmd/config:go_default_library", @@ -125,6 +124,7 @@ go_library( "//vendor:k8s.io/client-go/discovery", "//vendor:k8s.io/client-go/rest", "//vendor:k8s.io/client-go/tools/clientcmd", + "//vendor:k8s.io/client-go/tools/portforward", ], ) diff --git a/pkg/kubectl/cmd/portforward.go b/pkg/kubectl/cmd/portforward.go index d78f51f9e7..c2ff18c9cc 100644 --- a/pkg/kubectl/cmd/portforward.go +++ b/pkg/kubectl/cmd/portforward.go @@ -27,9 +27,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/portforward" "k8s.io/kubernetes/pkg/api" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" - "k8s.io/kubernetes/pkg/client/unversioned/portforward" "k8s.io/kubernetes/pkg/client/unversioned/remotecommand" "k8s.io/kubernetes/pkg/kubectl/cmd/templates" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" diff --git a/staging/copy.sh b/staging/copy.sh index 5b60602063..a43c3cc4d1 100755 --- a/staging/copy.sh +++ b/staging/copy.sh @@ -85,6 +85,7 @@ save "tools/auth" save "tools/cache" save "tools/clientcmd" save "tools/metrics" +save "tools/portforward" save "transport" save "third_party" save "plugin" @@ -107,11 +108,6 @@ find "${MAIN_REPO}/pkg/version" -maxdepth 1 -type f | xargs -I{} cp {} "${CLIENT mkcp "pkg/client/clientset_generated/${CLIENTSET}" "pkg/client/clientset_generated" mkcp "/pkg/client/record" "/pkg/client" -mkcp "/pkg/client/unversioned/portforward" "/pkg/client/unversioned" - -# remove this test because it imports the internal clientset -rm "${CLIENT_REPO_TEMP}"/pkg/client/unversioned/portforward/portforward_test.go - pushd "${CLIENT_REPO_TEMP}" > /dev/null echo "generating vendor/" GO15VENDOREXPERIMENT=1 godep save ./... @@ -189,7 +185,6 @@ function mvfolder { mvfolder "pkg/client/clientset_generated/${CLIENTSET}" kubernetes mvfolder pkg/client/record tools/record -mvfolder pkg/client/unversioned/portforward tools/portforward if [ "$(find "${CLIENT_REPO_TEMP}"/pkg/client -type f -name "*.go")" ]; then echo "${CLIENT_REPO_TEMP}/pkg/client is expected to be empty" exit 1 diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go index 2f2ef259a6..97a21876fe 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go @@ -21,8 +21,8 @@ import ( "net/http" "strings" - "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/httpstream" + "k8s.io/apimachinery/pkg/util/runtime" ) const HeaderSpdy31 = "SPDY/3.1" diff --git a/staging/src/k8s.io/client-go/tools/portforward/portforward.go b/staging/src/k8s.io/client-go/tools/portforward/portforward.go index 50a1ce6655..2624f2654c 100644 --- a/staging/src/k8s.io/client-go/tools/portforward/portforward.go +++ b/staging/src/k8s.io/client-go/tools/portforward/portforward.go @@ -30,9 +30,12 @@ import ( "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/pkg/api" - "k8s.io/client-go/pkg/kubelet/server/portforward" ) +// TODO move to API machinery and re-unify with kubelet/server/portfoward +// The subprotocol "portforward.k8s.io" is used for port forwarding. +const PortForwardProtocolV1Name = "portforward.k8s.io" + // PortForwarder knows how to listen for local connections and forward them to // a remote pod via an upgraded HTTP request. type PortForwarder struct { @@ -132,7 +135,7 @@ func (pf *PortForwarder) ForwardPorts() error { defer pf.Close() var err error - pf.streamConn, _, err = pf.dialer.Dial(portforward.PortForwardProtocolV1Name) + pf.streamConn, _, err = pf.dialer.Dial(PortForwardProtocolV1Name) if err != nil { return fmt.Errorf("error upgrading connection: %s", err) } diff --git a/staging/src/k8s.io/client-go/tools/portforward/portforward_test.go b/staging/src/k8s.io/client-go/tools/portforward/portforward_test.go new file mode 100644 index 0000000000..2d642b085d --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/portforward/portforward_test.go @@ -0,0 +1,194 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package portforward + +import ( + "net" + "os" + "reflect" + "strings" + "testing" + + "k8s.io/apimachinery/pkg/util/httpstream" +) + +type fakeDialer struct { + dialed bool + conn httpstream.Connection + err error + negotiatedProtocol string +} + +func (d *fakeDialer) Dial(protocols ...string) (httpstream.Connection, string, error) { + d.dialed = true + return d.conn, d.negotiatedProtocol, d.err +} + +func TestParsePortsAndNew(t *testing.T) { + tests := []struct { + input []string + expected []ForwardedPort + expectParseError bool + expectNewError bool + }{ + {input: []string{}, expectNewError: true}, + {input: []string{"a"}, expectParseError: true, expectNewError: true}, + {input: []string{":a"}, expectParseError: true, expectNewError: true}, + {input: []string{"-1"}, expectParseError: true, expectNewError: true}, + {input: []string{"65536"}, expectParseError: true, expectNewError: true}, + {input: []string{"0"}, expectParseError: true, expectNewError: true}, + {input: []string{"0:0"}, expectParseError: true, expectNewError: true}, + {input: []string{"a:5000"}, expectParseError: true, expectNewError: true}, + {input: []string{"5000:a"}, expectParseError: true, expectNewError: true}, + { + input: []string{"5000", "5000:5000", "8888:5000", "5000:8888", ":5000", "0:5000"}, + expected: []ForwardedPort{ + {5000, 5000}, + {5000, 5000}, + {8888, 5000}, + {5000, 8888}, + {0, 5000}, + {0, 5000}, + }, + }, + } + + for i, test := range tests { + parsed, err := parsePorts(test.input) + haveError := err != nil + if e, a := test.expectParseError, haveError; e != a { + t.Fatalf("%d: parsePorts: error expected=%t, got %t: %s", i, e, a, err) + } + + dialer := &fakeDialer{} + expectedStopChan := make(chan struct{}) + readyChan := make(chan struct{}) + pf, err := New(dialer, test.input, expectedStopChan, readyChan, os.Stdout, os.Stderr) + haveError = err != nil + if e, a := test.expectNewError, haveError; e != a { + t.Fatalf("%d: New: error expected=%t, got %t: %s", i, e, a, err) + } + + if test.expectParseError || test.expectNewError { + continue + } + + for pi, expectedPort := range test.expected { + if e, a := expectedPort.Local, parsed[pi].Local; e != a { + t.Fatalf("%d: local expected: %d, got: %d", i, e, a) + } + if e, a := expectedPort.Remote, parsed[pi].Remote; e != a { + t.Fatalf("%d: remote expected: %d, got: %d", i, e, a) + } + } + + if dialer.dialed { + t.Fatalf("%d: expected not dialed", i) + } + if e, a := test.expected, pf.ports; !reflect.DeepEqual(e, a) { + t.Fatalf("%d: ports: expected %#v, got %#v", i, e, a) + } + if e, a := expectedStopChan, pf.stopChan; e != a { + t.Fatalf("%d: stopChan: expected %#v, got %#v", i, e, a) + } + if pf.Ready == nil { + t.Fatalf("%d: Ready should be non-nil", i) + } + } +} + +type GetListenerTestCase struct { + Hostname string + Protocol string + ShouldRaiseError bool + ExpectedListenerAddress string +} + +func TestGetListener(t *testing.T) { + var pf PortForwarder + testCases := []GetListenerTestCase{ + { + Hostname: "localhost", + Protocol: "tcp4", + ShouldRaiseError: false, + ExpectedListenerAddress: "127.0.0.1", + }, + { + Hostname: "127.0.0.1", + Protocol: "tcp4", + ShouldRaiseError: false, + ExpectedListenerAddress: "127.0.0.1", + }, + { + Hostname: "[::1]", + Protocol: "tcp6", + ShouldRaiseError: false, + ExpectedListenerAddress: "::1", + }, + { + Hostname: "[::1]", + Protocol: "tcp4", + ShouldRaiseError: true, + }, + { + Hostname: "127.0.0.1", + Protocol: "tcp6", + ShouldRaiseError: true, + }, + { + // IPv6 address must be put into brackets. This test reveals this. + Hostname: "::1", + Protocol: "tcp6", + ShouldRaiseError: true, + }, + } + + for i, testCase := range testCases { + expectedListenerPort := "12345" + listener, err := pf.getListener(testCase.Protocol, testCase.Hostname, &ForwardedPort{12345, 12345}) + if err != nil && strings.Contains(err.Error(), "cannot assign requested address") { + t.Logf("Can't test #%d: %v", i, err) + continue + } + errorRaised := err != nil + + if testCase.ShouldRaiseError != errorRaised { + t.Errorf("Test case #%d failed: Data %v an error has been raised(%t) where it should not (or reciprocally): %v", i, testCase, testCase.ShouldRaiseError, err) + continue + } + if errorRaised { + continue + } + + if listener == nil { + t.Errorf("Test case #%d did not raise an error but failed in initializing listener", i) + continue + } + + host, port, _ := net.SplitHostPort(listener.Addr().String()) + t.Logf("Asked a %s forward for: %s:%v, got listener %s:%s, expected: %s", testCase.Protocol, testCase.Hostname, 12345, host, port, expectedListenerPort) + if host != testCase.ExpectedListenerAddress { + t.Errorf("Test case #%d failed: Listener does not listen on exepected address: asked %v got %v", i, testCase.ExpectedListenerAddress, host) + } + if port != expectedListenerPort { + t.Errorf("Test case #%d failed: Listener does not listen on exepected port: asked %v got %v", i, expectedListenerPort, port) + + } + listener.Close() + + } +} diff --git a/vendor/BUILD b/vendor/BUILD index 2c0caedb97..e9bebff5fc 100644 --- a/vendor/BUILD +++ b/vendor/BUILD @@ -12582,7 +12582,6 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/util/httpstream", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/client-go/pkg/api", - "//vendor:k8s.io/client-go/pkg/kubelet/server/portforward", ], ) @@ -13999,3 +13998,11 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/openapi", ], ) + +go_test( + name = "k8s.io/client-go/tools/portforward_test", + srcs = ["k8s.io/client-go/tools/portforward/portforward_test.go"], + library = ":k8s.io/client-go/tools/portforward", + tags = ["automanaged"], + deps = ["//vendor:k8s.io/apimachinery/pkg/util/httpstream"], +)