Refactor exec to make attach useful without a client.Config

The current executor structure is too dependent on client.Request
and client.Config. In order to do an attach from the server, it needs
to be possible to create an Executor from crypto/tls#TLSConfig and to
bypassing having a client.Request.

Changes:

* remotecommand.spdyExecutor - handles upgrading a request to SPDY and getting a connection
* remotecommand.NewAttach / New - moved to exec / portforward / attach since they handle requests
* Remove request.Upgrade() - it's too coupled to SPDY, and can live with the spdyExecutor
* Add request.VersionedParams(runtime.Object, runtime.ObjectConvertor) to handle object -> query transform
pull/6/head
Clayton Coleman 2015-09-26 20:00:39 -04:00
parent 2f90f660c1
commit 3f1b18fbba
13 changed files with 331 additions and 306 deletions

View File

@ -29,33 +29,19 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/httpstream/spdy"
)
type upgrader interface {
upgrade(*client.Request, *client.Config) (httpstream.Connection, error)
}
type defaultUpgrader struct{}
func (u *defaultUpgrader) upgrade(req *client.Request, config *client.Config) (httpstream.Connection, error) {
return req.Upgrade(config, spdy.NewRoundTripper)
}
// PortForwarder knows how to listen for local connections and forward them to
// a remote pod via an upgraded HTTP request.
type PortForwarder struct {
req *client.Request
config *client.Config
ports []ForwardedPort
stopChan <-chan struct{}
dialer httpstream.Dialer
streamConn httpstream.Connection
listeners []io.Closer
upgrader upgrader
Ready chan struct{}
requestIDLock sync.Mutex
requestID int
@ -120,7 +106,7 @@ func parsePorts(ports []string) ([]ForwardedPort, error) {
}
// New creates a new PortForwarder.
func New(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) (*PortForwarder, error) {
func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}) (*PortForwarder, error) {
if len(ports) == 0 {
return nil, errors.New("You must specify at least 1 port")
}
@ -128,10 +114,8 @@ func New(req *client.Request, config *client.Config, ports []string, stopChan <-
if err != nil {
return nil, err
}
return &PortForwarder{
req: req,
config: config,
dialer: dialer,
ports: parsedPorts,
stopChan: stopChan,
Ready: make(chan struct{}),
@ -143,11 +127,8 @@ func New(req *client.Request, config *client.Config, ports []string, stopChan <-
func (pf *PortForwarder) ForwardPorts() error {
defer pf.Close()
if pf.upgrader == nil {
pf.upgrader = &defaultUpgrader{}
}
var err error
pf.streamConn, err = pf.upgrader.upgrade(pf.req, pf.config)
pf.streamConn, err = pf.dialer.Dial()
if err != nil {
return fmt.Errorf("error upgrading connection: %s", err)
}

View File

@ -31,10 +31,23 @@ import (
"time"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/remotecommand"
"k8s.io/kubernetes/pkg/kubelet"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/httpstream"
)
type fakeDialer struct {
dialed bool
conn httpstream.Connection
err error
}
func (d *fakeDialer) Dial() (httpstream.Connection, error) {
d.dialed = true
return d.conn, d.err
}
func TestParsePortsAndNew(t *testing.T) {
tests := []struct {
input []string
@ -71,10 +84,9 @@ func TestParsePortsAndNew(t *testing.T) {
t.Fatalf("%d: parsePorts: error expected=%t, got %t: %s", i, e, a, err)
}
expectedRequest := &client.Request{}
expectedConfig := &client.Config{}
dialer := &fakeDialer{}
expectedStopChan := make(chan struct{})
pf, err := New(expectedRequest, expectedConfig, test.input, expectedStopChan)
pf, err := New(dialer, test.input, expectedStopChan)
haveError = err != nil
if e, a := test.expectNewError, haveError; e != a {
t.Fatalf("%d: New: error expected=%t, got %t: %s", i, e, a, err)
@ -93,11 +105,8 @@ func TestParsePortsAndNew(t *testing.T) {
}
}
if e, a := expectedRequest, pf.req; e != a {
t.Fatalf("%d: req: expected %#v, got %#v", i, e, a)
}
if e, a := expectedConfig, pf.config; e != a {
t.Fatalf("%d: config: expected %#v, got %#v", i, e, a)
if dialer.dialed {
t.Fatalf("%d: expected not dialed", i)
}
if e, a := test.expected, pf.ports; !reflect.DeepEqual(e, a) {
t.Fatalf("%d: ports: expected %#v, got %#v", i, e, a)
@ -293,17 +302,16 @@ func TestForwardPorts(t *testing.T) {
for testName, test := range tests {
server := httptest.NewServer(fakePortForwardServer(t, testName, test.serverSends, test.clientSends))
url, _ := url.ParseRequestURI(server.URL)
c := client.NewRESTClient(url, "x", nil, -1, -1)
req := c.Post().Resource("testing")
conf := &client.Config{
Host: server.URL,
url, _ := url.Parse(server.URL)
exec, err := remotecommand.NewExecutor(&client.Config{}, "POST", url)
if err != nil {
t.Fatal(err)
}
stopChan := make(chan struct{}, 1)
pf, err := New(req, conf, test.ports, stopChan)
pf, err := New(exec, test.ports, stopChan)
if err != nil {
t.Fatalf("%s: unexpected error calling New: %v", testName, err)
}
@ -363,18 +371,17 @@ func TestForwardPorts(t *testing.T) {
func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) {
server := httptest.NewServer(fakePortForwardServer(t, "allBindsFailed", nil, nil))
defer server.Close()
url, _ := url.ParseRequestURI(server.URL)
c := client.NewRESTClient(url, "x", nil, -1, -1)
req := c.Post().Resource("testing")
conf := &client.Config{
Host: server.URL,
url, _ := url.Parse(server.URL)
exec, err := remotecommand.NewExecutor(&client.Config{}, "POST", url)
if err != nil {
t.Fatal(err)
}
stopChan1 := make(chan struct{}, 1)
defer close(stopChan1)
pf1, err := New(req, conf, []string{"5555"}, stopChan1)
pf1, err := New(exec, []string{"5555"}, stopChan1)
if err != nil {
t.Fatalf("error creating pf1: %v", err)
}
@ -382,7 +389,7 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) {
<-pf1.Ready
stopChan2 := make(chan struct{}, 1)
pf2, err := New(&client.Request{}, &client.Config{}, []string{"5555"}, stopChan2)
pf2, err := New(exec, []string{"5555"}, stopChan2)
if err != nil {
t.Fatalf("error creating pf2: %v", err)
}

View File

@ -21,141 +21,127 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"
"sync"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/conversion/queryparams"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/httpstream/spdy"
)
type upgrader interface {
upgrade(*client.Request, *client.Config) (httpstream.Connection, error)
// Executor is an interface for transporting shell-style streams.
type Executor interface {
// Stream initiates the transport of the standard shell streams. It will transport any
// non-nil stream to a remote system, and return an error if a problem occurs. If tty
// is set, the stderr stream is not used (raw TTY manages stdout and stderr over the
// stdout stream).
Stream(stdin io.Reader, stdout, stderr io.Writer, tty bool) error
}
type defaultUpgrader struct{}
func (u *defaultUpgrader) upgrade(req *client.Request, config *client.Config) (httpstream.Connection, error) {
return req.Upgrade(config, spdy.NewRoundTripper)
// StreamExecutor supports the ability to dial an httpstream connection and the ability to
// run a command line stream protocol over that dialer.
type StreamExecutor interface {
Executor
httpstream.Dialer
}
type Streamer struct {
req *client.Request
config *client.Config
stdin io.Reader
stdout io.Writer
stderr io.Writer
tty bool
// streamExecutor handles transporting standard shell streams over an httpstream connection.
type streamExecutor struct {
upgrader httpstream.UpgradeRoundTripper
transport http.RoundTripper
upgrader upgrader
method string
url *url.URL
}
// Executor executes a command on a pod container
type Executor struct {
Streamer
command []string
}
// New creates a new RemoteCommandExecutor
func New(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) *Executor {
return &Executor{
command: command,
Streamer: Streamer{
req: req,
config: config,
stdin: stdin,
stdout: stdout,
stderr: stderr,
tty: tty,
},
}
}
type Attach struct {
Streamer
}
// NewAttach creates a new RemoteAttach
func NewAttach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) *Attach {
return &Attach{
Streamer: Streamer{
req: req,
config: config,
stdin: stdin,
stdout: stdout,
stderr: stderr,
tty: tty,
},
}
}
// Execute sends a remote command execution request, upgrading the
// connection and creating streams to represent stdin/stdout/stderr. Data is
// copied between these streams and the supplied stdin/stdout/stderr parameters.
func (e *Attach) Execute() error {
opts := api.PodAttachOptions{
Stdin: (e.stdin != nil),
Stdout: (e.stdout != nil),
Stderr: (!e.tty && e.stderr != nil),
TTY: e.tty,
}
if err := e.setupRequestParameters(&opts); err != nil {
return err
}
return e.doStream()
}
// Execute sends a remote command execution request, upgrading the
// connection and creating streams to represent stdin/stdout/stderr. Data is
// copied between these streams and the supplied stdin/stdout/stderr parameters.
func (e *Executor) Execute() error {
opts := api.PodExecOptions{
Stdin: (e.stdin != nil),
Stdout: (e.stdout != nil),
Stderr: (!e.tty && e.stderr != nil),
TTY: e.tty,
Command: e.command,
}
if err := e.setupRequestParameters(&opts); err != nil {
return err
}
return e.doStream()
}
func (e *Streamer) setupRequestParameters(obj runtime.Object) error {
versioned, err := api.Scheme.ConvertToVersion(obj, e.config.Version)
// NewExecutor connects to the provided server and upgrades the connection to
// multiplexed bidirectional streams. The current implementation uses SPDY,
// but this could be replaced with HTTP/2 once it's available, or something else.
// TODO: the common code between this and portforward could be abstracted.
func NewExecutor(config *client.Config, method string, url *url.URL) (StreamExecutor, error) {
tlsConfig, err := client.TLSConfigFor(config)
if err != nil {
return err
return nil, err
}
params, err := queryparams.Convert(versioned)
upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig)
wrapper, err := client.HTTPWrappersForConfig(config, upgradeRoundTripper)
if err != nil {
return err
return nil, err
}
for k, v := range params {
for _, vv := range v {
e.req.Param(k, vv)
}
}
return nil
return &streamExecutor{
upgrader: upgradeRoundTripper,
transport: wrapper,
method: method,
url: url,
}, nil
}
func (e *Streamer) doStream() error {
if e.upgrader == nil {
e.upgrader = &defaultUpgrader{}
// NewStreamExecutor upgrades the request so that it supports multiplexed bidirectional
// streams. This method takes a stream upgrader and an optional function that is invoked
// to wrap the round tripper. This method may be used by clients that are lower level than
// Kubernetes clients or need to provide their own upgrade round tripper.
func NewStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) {
var rt http.RoundTripper = upgrader
if fn != nil {
rt = fn(rt)
}
conn, err := e.upgrader.upgrade(e.req, e.config)
return &streamExecutor{
upgrader: upgrader,
transport: rt,
method: method,
url: url,
}, nil
}
// Dial opens a connection to a remote server and attempts to negotiate a SPDY connection.
func (e *streamExecutor) Dial() (httpstream.Connection, error) {
client := &http.Client{Transport: e.transport}
req, err := http.NewRequest(e.method, e.url.String(), nil)
if err != nil {
return nil, fmt.Errorf("error creating request: %s", err)
}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("error sending request: %s", err)
}
defer resp.Body.Close()
// TODO: handle protocol selection in the future
return e.upgrader.NewConnection(resp)
}
// Stream opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects.
func (e *streamExecutor) Stream(stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
conn, err := e.Dial()
if err != nil {
return err
}
defer conn.Close()
// TODO: negotiate protocols
streamer := &streamProtocol{
stdin: stdin,
stdout: stdout,
stderr: stderr,
tty: tty,
}
return streamer.stream(conn)
}
type streamProtocol struct {
stdin io.Reader
stdout io.Writer
stderr io.Writer
tty bool
}
func (e *streamProtocol) stream(conn httpstream.Connection) error {
headers := http.Header{}
// set up error stream

View File

@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
@ -183,12 +184,17 @@ func TestRequestExecuteRemoteCommand(t *testing.T) {
url, _ := url.ParseRequestURI(server.URL)
c := client.NewRESTClient(url, "x", nil, -1, -1)
req := c.Post().Resource("testing")
req.Param("command", "ls")
req.Param("command", "/")
conf := &client.Config{
Host: server.URL,
}
e := New(req, conf, []string{"ls", "/"}, strings.NewReader(strings.Repeat(testCase.Stdin, testCase.MessageCount)), localOut, localErr, testCase.Tty)
err := e.Execute()
e, err := NewExecutor(conf, "POST", req.URL())
if err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
err = e.Stream(strings.NewReader(strings.Repeat(testCase.Stdin, testCase.MessageCount)), localOut, localErr, testCase.Tty)
hasErr := err != nil
if len(testCase.Error) > 0 {
@ -263,8 +269,12 @@ func TestRequestAttachRemoteCommand(t *testing.T) {
conf := &client.Config{
Host: server.URL,
}
e := NewAttach(req, conf, strings.NewReader(testCase.Stdin), localOut, localErr, testCase.Tty)
err := e.Execute()
e, err := NewExecutor(conf, "POST", req.URL())
if err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
err = e.Stream(strings.NewReader(testCase.Stdin), localOut, localErr, testCase.Tty)
hasErr := err != nil
if len(testCase.Error) > 0 {
@ -301,3 +311,66 @@ func TestRequestAttachRemoteCommand(t *testing.T) {
server.Close()
}
}
type fakeUpgrader struct {
req *http.Request
resp *http.Response
conn httpstream.Connection
err, connErr error
checkResponse bool
t *testing.T
}
func (u *fakeUpgrader) RoundTrip(req *http.Request) (*http.Response, error) {
u.req = req
return u.resp, u.err
}
func (u *fakeUpgrader) NewConnection(resp *http.Response) (httpstream.Connection, error) {
if u.checkResponse && u.resp != resp {
u.t.Errorf("response objects passed did not match: %#v", resp)
}
return u.conn, u.connErr
}
type fakeConnection struct {
httpstream.Connection
}
// Dial is the common functionality between any stream based upgrader, regardless of protocol.
// This method ensures that someone can use a generic stream executor without being dependent
// on the core Kube client config behavior.
func TestDial(t *testing.T) {
upgrader := &fakeUpgrader{
t: t,
checkResponse: true,
conn: &fakeConnection{},
resp: &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(&bytes.Buffer{}),
},
}
var called bool
testFn := func(rt http.RoundTripper) http.RoundTripper {
if rt != upgrader {
t.Fatalf("unexpected round tripper: %#v", rt)
}
called = true
return rt
}
exec, err := NewStreamExecutor(upgrader, testFn, "POST", &url.URL{Host: "something.com", Scheme: "https"})
if err != nil {
t.Fatal(err)
}
conn, err := exec.Dial()
if err != nil {
t.Fatal(err)
}
if conn != upgrader.conn {
t.Errorf("unexpected connection: %#v", conn)
}
if !called {
t.Errorf("wrapper not called")
}
}

View File

@ -18,7 +18,6 @@ package unversioned
import (
"bytes"
"crypto/tls"
"fmt"
"io"
"io/ioutil"
@ -35,11 +34,11 @@ import (
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/metrics"
"k8s.io/kubernetes/pkg/conversion/queryparams"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"
watchjson "k8s.io/kubernetes/pkg/watch/json"
@ -406,6 +405,31 @@ func (r *Request) Param(paramName, s string) *Request {
return r.setParam(paramName, s)
}
// VersionedParams will take the provided object, serialize it to a map[string][]string using the
// implicit RESTClient API version and the provided object convertor, and then add those as parameters
// to the request. Use this to provide versioned query parameters from client libraries.
func (r *Request) VersionedParams(obj runtime.Object, convertor runtime.ObjectConvertor) *Request {
if r.err != nil {
return r
}
versioned, err := convertor.ConvertToVersion(obj, r.apiVersion)
if err != nil {
r.err = err
return r
}
params, err := queryparams.Convert(versioned)
if err != nil {
r.err = err
return r
}
for k, v := range params {
for _, vv := range v {
r.setParam(k, vv)
}
}
return r
}
func (r *Request) setParam(paramName, value string) *Request {
if specialParams.Has(paramName) {
r.err = fmt.Errorf("must set %v through the corresponding function, not directly.", paramName)
@ -613,41 +637,6 @@ func (r *Request) Stream() (io.ReadCloser, error) {
}
}
// Upgrade upgrades the request so that it supports multiplexed bidirectional
// streams. The current implementation uses SPDY, but this could be replaced
// with HTTP/2 once it's available, or something else.
func (r *Request) Upgrade(config *Config, newRoundTripperFunc func(*tls.Config) httpstream.UpgradeRoundTripper) (httpstream.Connection, error) {
if r.err != nil {
return nil, r.err
}
tlsConfig, err := TLSConfigFor(config)
if err != nil {
return nil, err
}
upgradeRoundTripper := newRoundTripperFunc(tlsConfig)
wrapper, err := HTTPWrappersForConfig(config, upgradeRoundTripper)
if err != nil {
return nil, err
}
r.client = &http.Client{Transport: wrapper}
req, err := http.NewRequest(r.verb, r.URL().String(), nil)
if err != nil {
return nil, fmt.Errorf("Error creating request: %s", err)
}
resp, err := r.client.Do(req)
if err != nil {
return nil, fmt.Errorf("Error sending request: %s", err)
}
defer resp.Body.Close()
return upgradeRoundTripper.NewConnection(resp)
}
// request connects to the server and invokes the provided function when a server response is
// received. It handles retry behavior and up front validation of requests. It wil invoke
// fn at most once. It will return an error if a problem occurred prior to connecting to the

View File

@ -18,7 +18,6 @@ package unversioned
import (
"bytes"
"crypto/tls"
"encoding/base64"
"errors"
"io"
@ -158,6 +157,22 @@ func TestRequestParam(t *testing.T) {
}
}
func TestRequestVersionedParams(t *testing.T) {
r := (&Request{}).Param("foo", "a")
if !api.Semantic.DeepDerivative(r.params, url.Values{"foo": []string{"a"}}) {
t.Errorf("should have set a param: %#v", r)
}
r.VersionedParams(&api.PodLogOptions{Follow: true, Container: "bar"}, api.Scheme)
if !api.Semantic.DeepDerivative(r.params, url.Values{
"foo": []string{"a"},
"container": []string{"bar"},
"follow": []string{"1"},
}) {
t.Errorf("should have set a param: %#v", r)
}
}
func TestRequestURI(t *testing.T) {
r := (&Request{}).Param("foo", "a")
r.Prefix("other")
@ -595,88 +610,6 @@ func (f *fakeUpgradeRoundTripper) NewConnection(resp *http.Response) (httpstream
return f.conn, nil
}
func TestRequestUpgrade(t *testing.T) {
uri, _ := url.Parse("http://localhost/")
testCases := []struct {
Request *Request
Config *Config
RoundTripper *fakeUpgradeRoundTripper
Err bool
AuthBasicHeader bool
AuthBearerHeader bool
}{
{
Request: &Request{err: errors.New("bail")},
Err: true,
},
{
Request: &Request{},
Config: &Config{
TLSClientConfig: TLSClientConfig{
CAFile: "foo",
},
Insecure: true,
},
Err: true,
},
{
Request: &Request{},
Config: &Config{
Username: "u",
Password: "p",
BearerToken: "b",
},
Err: true,
},
{
Request: NewRequest(nil, "", uri, testapi.Default.Version(), testapi.Default.Codec()),
Config: &Config{
Username: "u",
Password: "p",
},
AuthBasicHeader: true,
Err: false,
},
{
Request: NewRequest(nil, "", uri, testapi.Default.Version(), testapi.Default.Codec()),
Config: &Config{
BearerToken: "b",
},
AuthBearerHeader: true,
Err: false,
},
}
for i, testCase := range testCases {
r := testCase.Request
rt := &fakeUpgradeRoundTripper{}
expectedConn := &fakeUpgradeConnection{}
conn, err := r.Upgrade(testCase.Config, func(config *tls.Config) httpstream.UpgradeRoundTripper {
rt.conn = expectedConn
return rt
})
_ = conn
hasErr := err != nil
if hasErr != testCase.Err {
t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, r.err)
}
if testCase.Err {
continue
}
if testCase.AuthBasicHeader && !strings.Contains(rt.req.Header.Get("Authorization"), "Basic") {
t.Errorf("%d: expected basic auth header, got: %s", i, rt.req.Header.Get("Authorization"))
}
if testCase.AuthBearerHeader && !strings.Contains(rt.req.Header.Get("Authorization"), "Bearer") {
t.Errorf("%d: expected bearer auth header, got: %s", i, rt.req.Header.Get("Authorization"))
}
if e, a := expectedConn, conn; e != a {
t.Errorf("%d: conn: expected %#v, got %#v", i, e, a)
}
}
}
func TestRequestDo(t *testing.T) {
testCases := []struct {
Request *Request

View File

@ -19,6 +19,7 @@ package cmd
import (
"fmt"
"io"
"net/url"
"os"
"os/signal"
"syscall"
@ -72,15 +73,18 @@ func NewCmdAttach(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer)
// RemoteAttach defines the interface accepted by the Attach command - provided for test stubbing
type RemoteAttach interface {
Attach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
Attach(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
}
// DefaultRemoteAttach is the standard implementation of attaching
type DefaultRemoteAttach struct{}
func (*DefaultRemoteAttach) Attach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
attach := remotecommand.NewAttach(req, config, stdin, stdout, stderr, tty)
return attach.Execute()
func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
exec, err := remotecommand.NewExecutor(config, method, url)
if err != nil {
return err
}
return exec.Stream(stdin, stdout, stderr, tty)
}
// AttachOptions declare the arguments accepted by the Exec command
@ -201,7 +205,7 @@ func (p *AttachOptions) Run() error {
SubResource("attach").
Param("container", p.GetContainerName(pod))
return p.Attach.Attach(req, p.Config, stdin, p.Out, p.Err, tty)
return p.Attach.Attach("POST", req.URL(), p.Config, stdin, p.Out, p.Err, tty)
}
// GetContainerName returns the name of the container to attach to, with a fallback.

View File

@ -21,6 +21,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"testing"
"github.com/spf13/cobra"
@ -32,12 +33,14 @@ import (
)
type fakeRemoteAttach struct {
req *client.Request
method string
url *url.URL
attachErr error
}
func (f *fakeRemoteAttach) Attach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
f.req = req
func (f *fakeRemoteAttach) Attach(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
f.method = method
f.url = url
return f.attachErr
}
@ -173,10 +176,16 @@ func TestAttach(t *testing.T) {
t.Errorf("%s: Unexpected error: %v", test.name, err)
continue
}
if !test.attachErr && ex.req.URL().Path != test.attachPath {
if test.attachErr {
continue
}
if ex.url.Path != test.attachPath {
t.Errorf("%s: Did not get expected path for exec request", test.name)
continue
}
if ex.method != "POST" {
t.Errorf("%s: Did not get method for attach request: %s", test.name, ex.method)
}
}
}

View File

@ -19,6 +19,7 @@ package cmd
import (
"fmt"
"io"
"net/url"
"os"
"os/signal"
"syscall"
@ -73,15 +74,18 @@ func NewCmdExec(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer) *
// RemoteExecutor defines the interface accepted by the Exec command - provided for test stubbing
type RemoteExecutor interface {
Execute(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
Execute(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
}
// DefaultRemoteExecutor is the standard implementation of remote command execution
type DefaultRemoteExecutor struct{}
func (*DefaultRemoteExecutor) Execute(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
executor := remotecommand.New(req, config, command, stdin, stdout, stderr, tty)
return executor.Execute()
func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
exec, err := remotecommand.NewExecutor(config, method, url)
if err != nil {
return err
}
return exec.Stream(stdin, stdout, stderr, tty)
}
// ExecOptions declare the arguments accepted by the Exec command
@ -220,6 +224,14 @@ func (p *ExecOptions) Run() error {
Namespace(pod.Namespace).
SubResource("exec").
Param("container", containerName)
req.VersionedParams(&api.PodExecOptions{
Container: containerName,
Command: p.Command,
Stdin: stdin != nil,
Stdout: p.Out != nil,
Stderr: p.Err != nil,
TTY: tty,
}, api.Scheme)
return p.Executor.Execute(req, p.Config, p.Command, stdin, p.Out, p.Err, tty)
return p.Executor.Execute("POST", req.URL(), p.Config, stdin, p.Out, p.Err, tty)
}

View File

@ -21,6 +21,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"reflect"
"testing"
@ -33,12 +34,14 @@ import (
)
type fakeRemoteExecutor struct {
req *client.Request
method string
url *url.URL
execErr error
}
func (f *fakeRemoteExecutor) Execute(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
f.req = req
func (f *fakeRemoteExecutor) Execute(method string, url *url.URL, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
f.method = method
f.url = url
return f.execErr
}
@ -198,10 +201,16 @@ func TestExec(t *testing.T) {
t.Errorf("%s: Unexpected error: %v", test.name, err)
continue
}
if !test.execErr && ex.req.URL().Path != test.execPath {
if test.execErr {
continue
}
if ex.url.Path != test.execPath {
t.Errorf("%s: Did not get expected path for exec request", test.name)
continue
}
if ex.method != "POST" {
t.Errorf("%s: Did not get method for exec request: %s", test.name, ex.method)
}
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package cmd
import (
"net/url"
"os"
"os/signal"
@ -25,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/portforward"
"k8s.io/kubernetes/pkg/client/unversioned/remotecommand"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
)
@ -60,13 +62,17 @@ func NewCmdPortForward(f *cmdutil.Factory) *cobra.Command {
}
type portForwarder interface {
ForwardPorts(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) error
ForwardPorts(method string, url *url.URL, config *client.Config, ports []string, stopChan <-chan struct{}) error
}
type defaultPortForwarder struct{}
func (*defaultPortForwarder) ForwardPorts(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) error {
fw, err := portforward.New(req, config, ports, stopChan)
func (*defaultPortForwarder) ForwardPorts(method string, url *url.URL, config *client.Config, ports []string, stopChan <-chan struct{}) error {
dialer, err := remotecommand.NewExecutor(config, method, url)
if err != nil {
return err
}
fw, err := portforward.New(dialer, ports, stopChan)
if err != nil {
return err
}
@ -130,5 +136,5 @@ func RunPortForward(f *cmdutil.Factory, cmd *cobra.Command, args []string, fw po
Name(pod.Name).
SubResource("portforward")
return fw.ForwardPorts(req, config, args, stopCh)
return fw.ForwardPorts("POST", req.URL(), config, args, stopCh)
}

View File

@ -19,6 +19,7 @@ package cmd
import (
"fmt"
"net/http"
"net/url"
"testing"
"github.com/spf13/cobra"
@ -30,12 +31,14 @@ import (
)
type fakePortForwarder struct {
req *client.Request
pfErr error
method string
url *url.URL
pfErr error
}
func (f *fakePortForwarder) ForwardPorts(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) error {
f.req = req
func (f *fakePortForwarder) ForwardPorts(method string, url *url.URL, config *client.Config, ports []string, stopChan <-chan struct{}) error {
f.method = method
f.url = url
return f.pfErr
}
@ -92,12 +95,20 @@ func TestPortForward(t *testing.T) {
if test.pfErr && err != ff.pfErr {
t.Errorf("%s: Unexpected exec error: %v", test.name, err)
}
if !test.pfErr && ff.req.URL().Path != test.pfPath {
t.Errorf("%s: Did not get expected path for portforward request", test.name)
}
if !test.pfErr && err != nil {
t.Errorf("%s: Unexpected error: %v", test.name, err)
}
if test.pfErr {
continue
}
if ff.url.Path != test.pfPath {
t.Errorf("%s: Did not get expected path for portforward request", test.name)
}
if ff.method != "POST" {
t.Errorf("%s: Did not get method for attach request: %s", test.name, ff.method)
}
}
}
@ -154,7 +165,7 @@ func TestPortForwardWithPFlag(t *testing.T) {
if test.pfErr && err != ff.pfErr {
t.Errorf("%s: Unexpected exec error: %v", test.name, err)
}
if !test.pfErr && ff.req.URL().Path != test.pfPath {
if !test.pfErr && ff.url.Path != test.pfPath {
t.Errorf("%s: Did not get expected path for portforward request", test.name)
}
if !test.pfErr && err != nil {

View File

@ -37,6 +37,11 @@ type NewStreamHandler func(Stream) error
// performs no other logic.
func NoOpNewStreamHandler(stream Stream) error { return nil }
// Dialer knows how to open a streaming connection to a server.
type Dialer interface {
Dial() (Connection, error)
}
// UpgradeRoundTripper is a type of http.RoundTripper that is able to upgrade
// HTTP requests to support multiplexed bidirectional streams. After RoundTrip()
// is invoked, if the upgrade is successful, clients may retrieve the upgraded