Prepare to introduce websockets for exec and portforward

Refactor the code in remotecommand to better represent the structure of
what is common between portforward and exec.
pull/6/head
Clayton Coleman 2017-07-07 17:54:34 -04:00
parent 95a4a5d6eb
commit 12c7874c0d
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
10 changed files with 151 additions and 126 deletions

View File

@ -132,7 +132,7 @@ func TestForwardPorts(t *testing.T) {
server := httptest.NewServer(fakePortForwardServer(t, testName, test.serverSends, test.clientSends))
url, _ := url.Parse(server.URL)
exec, err := remotecommand.NewExecutor(&restclient.Config{}, "POST", url)
exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", url)
if err != nil {
t.Fatal(err)
}
@ -202,7 +202,7 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) {
defer server.Close()
url, _ := url.Parse(server.URL)
exec, err := remotecommand.NewExecutor(&restclient.Config{}, "POST", url)
exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", url)
if err != nil {
t.Fatal(err)
}

View File

@ -255,7 +255,7 @@ func TestStream(t *testing.T) {
conf := &restclient.Config{
Host: server.URL,
}
e, err := remoteclient.NewExecutor(conf, "POST", req.URL())
e, err := remoteclient.NewSPDYExecutor(conf, "POST", req.URL())
if err != nil {
t.Errorf("%s: unexpected error: %v", name, err)
continue
@ -352,7 +352,7 @@ func TestDial(t *testing.T) {
called = true
return rt
}
exec, err := remoteclient.NewStreamExecutor(upgrader, testFn, "POST", &url.URL{Host: "something.com", Scheme: "https"})
exec, err := newStreamExecutor(upgrader, testFn, "POST", &url.URL{Host: "something.com", Scheme: "https"})
if err != nil {
t.Fatal(err)
}
@ -368,3 +368,20 @@ func TestDial(t *testing.T) {
}
_ = protocol
}
// newStreamExecutor upgrades the request so that it supports multiplexed bidirectional
// streams. This method takes a stream upgrader and an optional function that is invoked
// to wrap the round tripper. This method may be used by clients that are lower level than
// Kubernetes clients or need to provide their own upgrade round tripper.
func newStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) {
rt := http.RoundTripper(upgrader)
if fn != nil {
rt = fn(rt)
}
return &streamExecutor{
upgrader: upgrader,
transport: rt,
method: method,
url: url,
}, nil
}

View File

@ -132,7 +132,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/jsonmergepatch:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/mergepatch:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/remotecommand:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/validation:go_default_library",

View File

@ -28,7 +28,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubernetes/pkg/api"
@ -97,17 +96,16 @@ type RemoteAttach interface {
type DefaultRemoteAttach struct{}
func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
exec, err := remotecommand.NewExecutor(config, method, url)
exec, err := remotecommand.NewSPDYExecutor(config, method, url)
if err != nil {
return err
}
return exec.Stream(remotecommand.StreamOptions{
SupportedProtocols: remotecommandconsts.SupportedStreamingProtocols,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
TerminalSizeQueue: terminalSizeQueue,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
TerminalSizeQueue: terminalSizeQueue,
})
}

View File

@ -25,7 +25,6 @@ import (
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubernetes/pkg/api"
@ -101,17 +100,16 @@ type RemoteExecutor interface {
type DefaultRemoteExecutor struct{}
func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
exec, err := remotecommand.NewExecutor(config, method, url)
exec, err := remotecommand.NewSPDYExecutor(config, method, url)
if err != nil {
return err
}
return exec.Stream(remotecommand.StreamOptions{
SupportedProtocols: remotecommandconsts.SupportedStreamingProtocols,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
TerminalSizeQueue: terminalSizeQueue,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
TerminalSizeQueue: terminalSizeQueue,
})
}

View File

@ -19,6 +19,7 @@ package cmd
import (
"fmt"
"io"
"net/http"
"net/url"
"os"
"os/signal"
@ -102,7 +103,11 @@ type defaultPortForwarder struct {
}
func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
dialer, err := remotecommand.NewExecutor(opts.Config, method, url)
transport, upgrader, err := remotecommand.SPDYRoundTripperFor(opts.Config)
if err != nil {
return err
}
dialer, err := remotecommand.NewSPDYDialer(upgrader, &http.Client{Transport: transport}, method, url)
if err != nil {
return err
}

View File

@ -237,7 +237,7 @@ func TestServePortForward(t *testing.T) {
reqURL, err := url.Parse(resp.Url)
require.NoError(t, err)
exec, err := remotecommand.NewExecutor(&restclient.Config{}, "POST", reqURL)
exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", reqURL)
require.NoError(t, err)
streamConn, _, err := exec.Dial(kubeletportforward.ProtocolV1Name)
require.NoError(t, err)
@ -297,7 +297,7 @@ func runRemoteCommandTest(t *testing.T, commandType string) {
go func() {
defer wg.Done()
exec, err := remotecommand.NewExecutor(&restclient.Config{}, "POST", reqURL)
exec, err := remotecommand.NewSPDYExecutor(&restclient.Config{}, "POST", reqURL)
require.NoError(t, err)
opts := remotecommand.StreamOptions{

View File

@ -35,12 +35,11 @@ import (
// protocols, input/output streams, if the client is requesting a TTY, and a terminal size queue to
// support terminal resizing.
type StreamOptions struct {
SupportedProtocols []string
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
Tty bool
TerminalSizeQueue TerminalSizeQueue
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
Tty bool
TerminalSizeQueue TerminalSizeQueue
}
// Executor is an interface for transporting shell-style streams.
@ -52,93 +51,10 @@ type Executor interface {
Stream(options StreamOptions) error
}
// StreamExecutor supports the ability to dial an httpstream connection and the ability to
// run a command line stream protocol over that dialer.
type StreamExecutor interface {
Executor
httpstream.Dialer
}
// streamExecutor handles transporting standard shell streams over an httpstream connection.
type streamExecutor struct {
upgrader httpstream.UpgradeRoundTripper
transport http.RoundTripper
method string
url *url.URL
}
// NewExecutor connects to the provided server and upgrades the connection to
// multiplexed bidirectional streams. The current implementation uses SPDY,
// but this could be replaced with HTTP/2 once it's available, or something else.
// TODO: the common code between this and portforward could be abstracted.
func NewExecutor(config *restclient.Config, method string, url *url.URL) (StreamExecutor, error) {
tlsConfig, err := restclient.TLSConfigFor(config)
if err != nil {
return nil, err
}
upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true)
wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper)
if err != nil {
return nil, err
}
return &streamExecutor{
upgrader: upgradeRoundTripper,
transport: wrapper,
method: method,
url: url,
}, nil
}
// NewStreamExecutor upgrades the request so that it supports multiplexed bidirectional
// streams. This method takes a stream upgrader and an optional function that is invoked
// to wrap the round tripper. This method may be used by clients that are lower level than
// Kubernetes clients or need to provide their own upgrade round tripper.
func NewStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) {
rt := http.RoundTripper(upgrader)
if fn != nil {
rt = fn(rt)
}
return &streamExecutor{
upgrader: upgrader,
transport: rt,
method: method,
url: url,
}, nil
}
// Dial opens a connection to a remote server and attempts to negotiate a SPDY
// connection. Upon success, it returns the connection and the protocol
// selected by the server.
func (e *streamExecutor) Dial(protocols ...string) (httpstream.Connection, string, error) {
rt := transport.DebugWrappers(e.transport)
// TODO the client probably shouldn't be created here, as it doesn't allow
// flexibility to allow callers to configure it.
client := &http.Client{Transport: rt}
req, err := http.NewRequest(e.method, e.url.String(), nil)
if err != nil {
return nil, "", fmt.Errorf("error creating request: %v", err)
}
for i := range protocols {
req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i])
}
resp, err := client.Do(req)
if err != nil {
return nil, "", fmt.Errorf("error sending request: %v", err)
}
defer resp.Body.Close()
conn, err := e.upgrader.NewConnection(resp)
if err != nil {
return nil, "", err
}
return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil
// SPDYUpgrader validates a response from the server after a SPDY upgrade.
type SPDYUpgrader interface {
// NewConnection validates the response and creates a new Connection.
NewConnection(resp *http.Response) (httpstream.Connection, error)
}
type streamCreator interface {
@ -149,10 +65,105 @@ type streamProtocolHandler interface {
stream(conn streamCreator) error
}
// streamExecutor handles transporting standard shell streams over an httpstream connection.
type streamExecutor struct {
upgrader SPDYUpgrader
transport http.RoundTripper
method string
url *url.URL
}
// NewSPDYExecutor connects to the provided server and upgrades the connection to
// multiplexed bidirectional streams.
func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) {
wrapper, upgradeRoundTripper, err := SPDYRoundTripperFor(config)
if err != nil {
return nil, err
}
wrapper = transport.DebugWrappers(wrapper)
return &streamExecutor{
upgrader: upgradeRoundTripper,
transport: wrapper,
method: method,
url: url,
}, nil
}
type spdyDialer struct {
client *http.Client
upgrader SPDYUpgrader
method string
url *url.URL
}
func NewSPDYDialer(upgrader SPDYUpgrader, client *http.Client, method string, url *url.URL) (httpstream.Dialer, error) {
return &spdyDialer{
client: client,
upgrader: upgrader,
method: method,
url: url,
}, nil
}
func (d *spdyDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
req, err := http.NewRequest(d.method, d.url.String(), nil)
if err != nil {
return nil, "", fmt.Errorf("error creating request: %v", err)
}
return NegotiateSPDYConnection(d.upgrader, d.client, req, protocols...)
}
// SPDYRoundTripperFor returns a round tripper to use with SPDY.
func SPDYRoundTripperFor(config *restclient.Config) (http.RoundTripper, SPDYUpgrader, error) {
tlsConfig, err := restclient.TLSConfigFor(config)
if err != nil {
return nil, nil, err
}
upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true)
wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper)
if err != nil {
return nil, nil, err
}
return wrapper, upgradeRoundTripper, nil
}
// NegotiateSPDYConnection opens a connection to a remote server and attempts to negotiate
// a SPDY connection. Upon success, it returns the connection and the protocol selected by
// the server. The client transport must use the upgradeRoundTripper - see SPDYRoundTripperFor.
func NegotiateSPDYConnection(upgrader SPDYUpgrader, client *http.Client, req *http.Request, protocols ...string) (httpstream.Connection, string, error) {
for i := range protocols {
req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i])
}
resp, err := client.Do(req)
if err != nil {
return nil, "", fmt.Errorf("error sending request: %v", err)
}
defer resp.Body.Close()
conn, err := upgrader.NewConnection(resp)
if err != nil {
return nil, "", err
}
return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil
}
// Stream opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects.
func (e *streamExecutor) Stream(options StreamOptions) error {
conn, protocol, err := e.Dial(options.SupportedProtocols...)
req, err := http.NewRequest(e.method, e.url.String(), nil)
if err != nil {
return fmt.Errorf("error creating request: %v", err)
}
conn, protocol, err := NegotiateSPDYConnection(
e.upgrader,
&http.Client{Transport: e.transport},
req,
remotecommand.StreamProtocolV4Name,
remotecommand.StreamProtocolV3Name,
remotecommand.StreamProtocolV2Name,
remotecommand.StreamProtocolV1Name,
)
if err != nil {
return err
}

View File

@ -123,7 +123,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/remotecommand:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",

View File

@ -24,7 +24,6 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
remocommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubernetes/pkg/api"
@ -135,15 +134,14 @@ func (f *Framework) ExecShellInPodWithFullOutput(podName string, cmd string) (st
}
func execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
exec, err := remotecommand.NewExecutor(config, method, url)
exec, err := remotecommand.NewSPDYExecutor(config, method, url)
if err != nil {
return err
}
return exec.Stream(remotecommand.StreamOptions{
SupportedProtocols: remocommandconsts.SupportedStreamingProtocols,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
})
}