From 3b21a9901bcd48bb452d3bf1a0cddc90dae142c4 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Mon, 18 Apr 2016 12:54:44 -0400 Subject: [PATCH] Support terminal resizing for exec/attach/run Add support for terminal resizing for exec, attach, and run. Note that for Docker, exec sessions inherit the environment from the primary process, so if the container was created with tty=false, that means the exec session's TERM variable will default to "dumb". Users can override this by setting TERM=xterm (or whatever is appropriate) to get the correct "smart" terminal behavior. --- cmd/hyperkube/kubectl.go | 7 +- cmd/kubectl/app/kubectl.go | 6 +- pkg/api/types.go | 2 + .../remotecommand/remotecommand.go | 41 ++-- .../remotecommand/remotecommand_test.go | 17 +- pkg/client/unversioned/remotecommand/v1.go | 62 ++--- pkg/client/unversioned/remotecommand/v2.go | 139 +++++++---- .../unversioned/remotecommand/v2_test.go | 228 ++++++++++++++++++ pkg/client/unversioned/remotecommand/v3.go | 108 +++++++++ pkg/kubectl/cmd/attach.go | 54 ++++- pkg/kubectl/cmd/attach_test.go | 3 +- pkg/kubectl/cmd/exec.go | 124 +++++----- pkg/kubectl/cmd/exec_test.go | 3 +- pkg/kubelet/container/resize.go | 46 ++++ pkg/kubelet/container/runtime.go | 5 +- pkg/kubelet/container/testing/fake_runtime.go | 5 +- pkg/kubelet/container/testing/runtime_mock.go | 5 +- pkg/kubelet/dockertools/docker.go | 2 + pkg/kubelet/dockertools/docker_manager.go | 13 +- pkg/kubelet/dockertools/exec.go | 19 +- pkg/kubelet/dockertools/fake_docker_client.go | 14 ++ .../dockertools/instrumented_docker.go | 18 ++ pkg/kubelet/dockertools/kube_docker_client.go | 18 ++ pkg/kubelet/kubelet.go | 11 +- pkg/kubelet/kubelet_test.go | 6 +- pkg/kubelet/lifecycle/handlers.go | 2 +- pkg/kubelet/lifecycle/handlers_test.go | 3 +- pkg/kubelet/prober/prober.go | 2 +- pkg/kubelet/rkt/rkt.go | 9 +- pkg/kubelet/server/remotecommand/attach.go | 5 +- .../{contants.go => constants.go} | 7 +- pkg/kubelet/server/remotecommand/exec.go | 5 +- .../server/remotecommand/httpstream.go | 101 +++++++- pkg/kubelet/server/remotecommand/websocket.go | 77 +++--- pkg/kubelet/server/server.go | 5 +- pkg/kubelet/server/server_test.go | 5 +- pkg/util/term/resize.go | 147 +++++++++++ pkg/util/term/resizeevents.go | 60 +++++ pkg/util/term/resizeevents_windows.go | 57 +++++ pkg/util/term/term.go | 45 +++- test/e2e_node/exec_util.go | 8 +- 41 files changed, 1247 insertions(+), 247 deletions(-) create mode 100644 pkg/client/unversioned/remotecommand/v2_test.go create mode 100644 pkg/client/unversioned/remotecommand/v3.go create mode 100644 pkg/kubelet/container/resize.go rename pkg/kubelet/server/remotecommand/{contants.go => constants.go} (77%) create mode 100644 pkg/util/term/resize.go create mode 100644 pkg/util/term/resizeevents.go create mode 100644 pkg/util/term/resizeevents_windows.go diff --git a/cmd/hyperkube/kubectl.go b/cmd/hyperkube/kubectl.go index 068b2781e0..7e76bd3fde 100644 --- a/cmd/hyperkube/kubectl.go +++ b/cmd/hyperkube/kubectl.go @@ -17,14 +17,15 @@ limitations under the License. package main import ( - "os" - + "github.com/docker/docker/pkg/term" "k8s.io/kubernetes/pkg/kubectl/cmd" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" ) func NewKubectlServer() *Server { - cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), os.Stdin, os.Stdout, os.Stderr) + // need to use term.StdStreams to get the right IO refs on Windows + stdin, stdout, stderr := term.StdStreams() + cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), stdin, stdout, stderr) localFlags := cmd.LocalFlags() localFlags.SetInterspersed(false) diff --git a/cmd/kubectl/app/kubectl.go b/cmd/kubectl/app/kubectl.go index 27cbfd83b5..455ae55139 100644 --- a/cmd/kubectl/app/kubectl.go +++ b/cmd/kubectl/app/kubectl.go @@ -17,7 +17,7 @@ limitations under the License. package app import ( - "os" + "github.com/docker/docker/pkg/term" "k8s.io/kubernetes/pkg/kubectl/cmd" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" @@ -28,6 +28,8 @@ WARNING: this logic is duplicated, with minor changes, in cmd/hyperkube/kubectl. Any salient changes here will need to be manually reflected in that file. */ func Run() error { - cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), os.Stdin, os.Stdout, os.Stderr) + // need to use term.StdStreams to get the right IO refs on Windows + stdin, stdout, stderr := term.StdStreams() + cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), stdin, stdout, stderr) return cmd.Execute() } diff --git a/pkg/api/types.go b/pkg/api/types.go index 6eb9a2b15f..e6ceb57900 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -2751,6 +2751,8 @@ const ( StreamTypeData = "data" // Value for streamType header for error stream StreamTypeError = "error" + // Value for streamType header for terminal resize stream + StreamTypeResize = "resize" // Name of header that specifies the port being forwarded PortHeader = "port" diff --git a/pkg/client/unversioned/remotecommand/remotecommand.go b/pkg/client/unversioned/remotecommand/remotecommand.go index d39947d7e1..ff18c4e763 100644 --- a/pkg/client/unversioned/remotecommand/remotecommand.go +++ b/pkg/client/unversioned/remotecommand/remotecommand.go @@ -29,15 +29,28 @@ import ( "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" "k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream/spdy" + "k8s.io/kubernetes/pkg/util/term" ) +// StreamOptions holds information pertaining to the current streaming session: supported stream +// protocols, input/output streams, if the client is requesting a TTY, and a terminal size queue to +// support terminal resizing. +type StreamOptions struct { + SupportedProtocols []string + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer + Tty bool + TerminalSizeQueue term.TerminalSizeQueue +} + // Executor is an interface for transporting shell-style streams. type Executor interface { // Stream initiates the transport of the standard shell streams. It will transport any // non-nil stream to a remote system, and return an error if a problem occurs. If tty // is set, the stderr stream is not used (raw TTY manages stdout and stderr over the // stdout stream). - Stream(supportedProtocols []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error + Stream(options StreamOptions) error } // StreamExecutor supports the ability to dial an httpstream connection and the ability to @@ -129,14 +142,18 @@ func (e *streamExecutor) Dial(protocols ...string) (httpstream.Connection, strin return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil } +type streamCreator interface { + CreateStream(headers http.Header) (httpstream.Stream, error) +} + type streamProtocolHandler interface { - stream(httpstream.Connection) error + stream(conn streamCreator) error } // Stream opens a protocol streamer to the server and streams until a client closes // the connection or the server disconnects. -func (e *streamExecutor) Stream(supportedProtocols []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { - conn, protocol, err := e.Dial(supportedProtocols...) +func (e *streamExecutor) Stream(options StreamOptions) error { + conn, protocol, err := e.Dial(options.SupportedProtocols...) if err != nil { return err } @@ -145,23 +162,15 @@ func (e *streamExecutor) Stream(supportedProtocols []string, stdin io.Reader, st var streamer streamProtocolHandler switch protocol { + case remotecommand.StreamProtocolV3Name: + streamer = newStreamProtocolV3(options) case remotecommand.StreamProtocolV2Name: - streamer = &streamProtocolV2{ - stdin: stdin, - stdout: stdout, - stderr: stderr, - tty: tty, - } + streamer = newStreamProtocolV2(options) case "": glog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name) fallthrough case remotecommand.StreamProtocolV1Name: - streamer = &streamProtocolV1{ - stdin: stdin, - stdout: stdout, - stderr: stderr, - tty: tty, - } + streamer = newStreamProtocolV1(options) } return streamer.stream(conn) diff --git a/pkg/client/unversioned/remotecommand/remotecommand_test.go b/pkg/client/unversioned/remotecommand/remotecommand_test.go index 9c445c26f7..b228df2e7f 100644 --- a/pkg/client/unversioned/remotecommand/remotecommand_test.go +++ b/pkg/client/unversioned/remotecommand/remotecommand_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/httpstream" + "k8s.io/kubernetes/pkg/util/term" ) type fakeExecutor struct { @@ -52,11 +53,11 @@ type fakeExecutor struct { exec bool } -func (ex *fakeExecutor) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error { +func (ex *fakeExecutor) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error { return ex.run(name, uid, container, cmd, in, out, err, tty) } -func (ex *fakeExecutor) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error { +func (ex *fakeExecutor) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error { return ex.run(name, uid, container, nil, in, out, err, tty) } @@ -253,7 +254,13 @@ func TestStream(t *testing.T) { t.Errorf("%s: unexpected error: %v", name, err) continue } - err = e.Stream(testCase.ClientProtocols, streamIn, streamOut, streamErr, testCase.Tty) + err = e.Stream(StreamOptions{ + SupportedProtocols: testCase.ClientProtocols, + Stdin: streamIn, + Stdout: streamOut, + Stderr: streamErr, + Tty: testCase.Tty, + }) hasErr := err != nil if len(testCase.Error) > 0 { @@ -277,13 +284,13 @@ func TestStream(t *testing.T) { if len(testCase.Stdout) > 0 { if e, a := strings.Repeat(testCase.Stdout, testCase.MessageCount), localOut; e != a.String() { - t.Errorf("%s: expected stdout data '%s', got '%s'", name, e, a) + t.Errorf("%s: expected stdout data %q, got %q", name, e, a) } } if testCase.Stderr != "" { if e, a := strings.Repeat(testCase.Stderr, testCase.MessageCount), localErr; e != a.String() { - t.Errorf("%s: expected stderr data '%s', got '%s'", name, e, a) + t.Errorf("%s: expected stderr data %q, got %q", name, e, a) } } diff --git a/pkg/client/unversioned/remotecommand/v1.go b/pkg/client/unversioned/remotecommand/v1.go index 2fa5b2e911..11d5d7b595 100644 --- a/pkg/client/unversioned/remotecommand/v1.go +++ b/pkg/client/unversioned/remotecommand/v1.go @@ -28,19 +28,27 @@ import ( ) // streamProtocolV1 implements the first version of the streaming exec & attach -// protocol. This version has some bugs, such as not being able to detecte when +// protocol. This version has some bugs, such as not being able to detect when // non-interactive stdin data has ended. See http://issues.k8s.io/13394 and // http://issues.k8s.io/13395 for more details. type streamProtocolV1 struct { - stdin io.Reader - stdout io.Writer - stderr io.Writer - tty bool + StreamOptions + + errorStream httpstream.Stream + remoteStdin httpstream.Stream + remoteStdout httpstream.Stream + remoteStderr httpstream.Stream } var _ streamProtocolHandler = &streamProtocolV1{} -func (e *streamProtocolV1) stream(conn httpstream.Connection) error { +func newStreamProtocolV1(options StreamOptions) streamProtocolHandler { + return &streamProtocolV1{ + StreamOptions: options, + } +} + +func (p *streamProtocolV1) stream(conn streamCreator) error { doneChan := make(chan struct{}, 2) errorChan := make(chan error) @@ -55,19 +63,15 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error { } } - var ( - err error - errorStream, remoteStdin, remoteStdout, remoteStderr httpstream.Stream - ) - // set up all the streams first + var err error headers := http.Header{} headers.Set(api.StreamType, api.StreamTypeError) - errorStream, err = conn.CreateStream(headers) + p.errorStream, err = conn.CreateStream(headers) if err != nil { return err } - defer errorStream.Reset() + defer p.errorStream.Reset() // Create all the streams first, then start the copy goroutines. The server doesn't start its copy // goroutines until it's received all of the streams. If the client creates the stdin stream and @@ -76,38 +80,38 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error { // getting processed because the server hasn't started its copying, and it won't do that until it // gets all the streams. By creating all the streams first, we ensure that the server is ready to // process data before the client starts sending any. See https://issues.k8s.io/16373 for more info. - if e.stdin != nil { + if p.Stdin != nil { headers.Set(api.StreamType, api.StreamTypeStdin) - remoteStdin, err = conn.CreateStream(headers) + p.remoteStdin, err = conn.CreateStream(headers) if err != nil { return err } - defer remoteStdin.Reset() + defer p.remoteStdin.Reset() } - if e.stdout != nil { + if p.Stdout != nil { headers.Set(api.StreamType, api.StreamTypeStdout) - remoteStdout, err = conn.CreateStream(headers) + p.remoteStdout, err = conn.CreateStream(headers) if err != nil { return err } - defer remoteStdout.Reset() + defer p.remoteStdout.Reset() } - if e.stderr != nil && !e.tty { + if p.Stderr != nil && !p.Tty { headers.Set(api.StreamType, api.StreamTypeStderr) - remoteStderr, err = conn.CreateStream(headers) + p.remoteStderr, err = conn.CreateStream(headers) if err != nil { return err } - defer remoteStderr.Reset() + defer p.remoteStderr.Reset() } // now that all the streams have been created, proceed with reading & copying // always read from errorStream go func() { - message, err := ioutil.ReadAll(errorStream) + message, err := ioutil.ReadAll(p.errorStream) if err != nil && err != io.EOF { errorChan <- fmt.Errorf("Error reading from error stream: %s", err) return @@ -118,25 +122,25 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error { } }() - if e.stdin != nil { + if p.Stdin != nil { // TODO this goroutine will never exit cleanly (the io.Copy never unblocks) // because stdin is not closed until the process exits. If we try to call // stdin.Close(), it returns no error but doesn't unblock the copy. It will // exit when the process exits, instead. - go cp(api.StreamTypeStdin, remoteStdin, e.stdin) + go cp(api.StreamTypeStdin, p.remoteStdin, p.Stdin) } waitCount := 0 completedStreams := 0 - if e.stdout != nil { + if p.Stdout != nil { waitCount++ - go cp(api.StreamTypeStdout, e.stdout, remoteStdout) + go cp(api.StreamTypeStdout, p.Stdout, p.remoteStdout) } - if e.stderr != nil && !e.tty { + if p.Stderr != nil && !p.Tty { waitCount++ - go cp(api.StreamTypeStderr, e.stderr, remoteStderr) + go cp(api.StreamTypeStderr, p.Stderr, p.remoteStderr) } Loop: diff --git a/pkg/client/unversioned/remotecommand/v2.go b/pkg/client/unversioned/remotecommand/v2.go index 66ca9c6486..bcba33bdf7 100644 --- a/pkg/client/unversioned/remotecommand/v2.go +++ b/pkg/client/unversioned/remotecommand/v2.go @@ -24,7 +24,6 @@ import ( "sync" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/runtime" ) @@ -33,63 +32,69 @@ import ( // version is referred to as version 2, even though it is the first actual // numbered version. type streamProtocolV2 struct { - stdin io.Reader - stdout io.Writer - stderr io.Writer - tty bool + StreamOptions + + errorStream io.Reader + remoteStdin io.ReadWriteCloser + remoteStdout io.Reader + remoteStderr io.Reader } var _ streamProtocolHandler = &streamProtocolV2{} -func (e *streamProtocolV2) stream(conn httpstream.Connection) error { - var ( - err error - errorStream, remoteStdin, remoteStdout, remoteStderr httpstream.Stream - ) +func newStreamProtocolV2(options StreamOptions) streamProtocolHandler { + return &streamProtocolV2{ + StreamOptions: options, + } +} +func (p *streamProtocolV2) createStreams(conn streamCreator) error { + var err error headers := http.Header{} - // set up all the streams first // set up error stream - errorChan := make(chan error) headers.Set(api.StreamType, api.StreamTypeError) - errorStream, err = conn.CreateStream(headers) + p.errorStream, err = conn.CreateStream(headers) if err != nil { return err } // set up stdin stream - if e.stdin != nil { + if p.Stdin != nil { headers.Set(api.StreamType, api.StreamTypeStdin) - remoteStdin, err = conn.CreateStream(headers) + p.remoteStdin, err = conn.CreateStream(headers) if err != nil { return err } } // set up stdout stream - if e.stdout != nil { + if p.Stdout != nil { headers.Set(api.StreamType, api.StreamTypeStdout) - remoteStdout, err = conn.CreateStream(headers) + p.remoteStdout, err = conn.CreateStream(headers) if err != nil { return err } } // set up stderr stream - if e.stderr != nil && !e.tty { + if p.Stderr != nil && !p.Tty { headers.Set(api.StreamType, api.StreamTypeStderr) - remoteStderr, err = conn.CreateStream(headers) + p.remoteStderr, err = conn.CreateStream(headers) if err != nil { return err } } + return nil +} - // now that all the streams have been created, proceed with reading & copying +func (p *streamProtocolV2) setupErrorStreamReading() chan error { + errorChan := make(chan error) - // always read from errorStream go func() { - message, err := ioutil.ReadAll(errorStream) + defer runtime.HandleCrash() + + message, err := ioutil.ReadAll(p.errorStream) switch { case err != nil && err != io.EOF: errorChan <- fmt.Errorf("error reading from error stream: %s", err) @@ -101,18 +106,23 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error { close(errorChan) }() - var wg sync.WaitGroup - var once sync.Once + return errorChan +} + +func (p *streamProtocolV2) copyStdin() { + if p.Stdin != nil { + var once sync.Once - if e.stdin != nil { // copy from client's stdin to container's stdin go func() { - // if e.stdin is noninteractive, e.g. `echo abc | kubectl exec -i -- cat`, make sure - // we close remoteStdin as soon as the copy from e.stdin to remoteStdin finishes. Otherwise - // the executed command will remain running. - defer once.Do(func() { remoteStdin.Close() }) + defer runtime.HandleCrash() - if _, err := io.Copy(remoteStdin, e.stdin); err != nil { + // if p.stdin is noninteractive, p.g. `echo abc | kubectl exec -i -- cat`, make sure + // we close remoteStdin as soon as the copy from p.stdin to remoteStdin finishes. Otherwise + // the executed command will remain running. + defer once.Do(func() { p.remoteStdin.Close() }) + + if _, err := io.Copy(p.remoteStdin, p.Stdin); err != nil { runtime.HandleError(err) } }() @@ -121,6 +131,9 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error { // be able to exit interactive sessions cleanly and not leak goroutines or // hang the client's terminal. // + // TODO we aren't using go-dockerclient any more; revisit this to determine if it's still + // required by engine-api. + // // go-dockerclient's current hijack implementation // (https://github.com/fsouza/go-dockerclient/blob/89f3d56d93788dfe85f864a44f85d9738fca0670/client.go#L564) // waits for all three streams (stdin/stdout/stderr) to finish copying @@ -129,35 +142,65 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error { // When that happens, we must Close() on our side of remoteStdin, to // allow the copy in hijack to complete, and hijack to return. go func() { - defer once.Do(func() { remoteStdin.Close() }) + defer runtime.HandleCrash() + defer once.Do(func() { p.remoteStdin.Close() }) + // this "copy" doesn't actually read anything - it's just here to wait for // the server to close remoteStdin. - if _, err := io.Copy(ioutil.Discard, remoteStdin); err != nil { + if _, err := io.Copy(ioutil.Discard, p.remoteStdin); err != nil { runtime.HandleError(err) } }() } +} + +func (p *streamProtocolV2) copyStdout(wg *sync.WaitGroup) { + if p.Stdout == nil { + return + } - if e.stdout != nil { - wg.Add(1) - go func() { - defer wg.Done() - if _, err := io.Copy(e.stdout, remoteStdout); err != nil { - runtime.HandleError(err) - } - }() + wg.Add(1) + go func() { + defer runtime.HandleCrash() + defer wg.Done() + + if _, err := io.Copy(p.Stdout, p.remoteStdout); err != nil { + runtime.HandleError(err) + } + }() +} + +func (p *streamProtocolV2) copyStderr(wg *sync.WaitGroup) { + if p.Stderr == nil || p.Tty { + return } - if e.stderr != nil && !e.tty { - wg.Add(1) - go func() { - defer wg.Done() - if _, err := io.Copy(e.stderr, remoteStderr); err != nil { - runtime.HandleError(err) - } - }() + wg.Add(1) + go func() { + defer runtime.HandleCrash() + defer wg.Done() + + if _, err := io.Copy(p.Stderr, p.remoteStderr); err != nil { + runtime.HandleError(err) + } + }() +} + +func (p *streamProtocolV2) stream(conn streamCreator) error { + if err := p.createStreams(conn); err != nil { + return err } + // now that all the streams have been created, proceed with reading & copying + + errorChan := p.setupErrorStreamReading() + + p.copyStdin() + + var wg sync.WaitGroup + p.copyStdout(&wg) + p.copyStderr(&wg) + // we're waiting for stdout/stderr to finish copying wg.Wait() diff --git a/pkg/client/unversioned/remotecommand/v2_test.go b/pkg/client/unversioned/remotecommand/v2_test.go new file mode 100644 index 0000000000..d5e988b1ec --- /dev/null +++ b/pkg/client/unversioned/remotecommand/v2_test.go @@ -0,0 +1,228 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remotecommand + +import ( + "errors" + "io" + "net/http" + "strings" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/httpstream" + "k8s.io/kubernetes/pkg/util/wait" +) + +type fakeReader struct { + err error +} + +func (r *fakeReader) Read([]byte) (int, error) { return 0, r.err } + +type fakeWriter struct{} + +func (*fakeWriter) Write([]byte) (int, error) { return 0, nil } + +type fakeStreamCreator struct { + created map[string]bool + errors map[string]error +} + +var _ streamCreator = &fakeStreamCreator{} + +func (f *fakeStreamCreator) CreateStream(headers http.Header) (httpstream.Stream, error) { + streamType := headers.Get(api.StreamType) + f.created[streamType] = true + return nil, f.errors[streamType] +} + +func TestV2CreateStreams(t *testing.T) { + tests := []struct { + name string + stdin bool + stdinError error + stdout bool + stdoutError error + stderr bool + stderrError error + errorError error + tty bool + expectError bool + }{ + { + name: "stdin error", + stdin: true, + stdinError: errors.New("stdin error"), + expectError: true, + }, + { + name: "stdout error", + stdout: true, + stdoutError: errors.New("stdout error"), + expectError: true, + }, + { + name: "stderr error", + stderr: true, + stderrError: errors.New("stderr error"), + expectError: true, + }, + { + name: "error stream error", + stdin: true, + stdout: true, + stderr: true, + errorError: errors.New("error stream error"), + expectError: true, + }, + { + name: "no errors", + stdin: true, + stdout: true, + stderr: true, + expectError: false, + }, + { + name: "no errors, stderr & tty set, don't expect stderr", + stdin: true, + stdout: true, + stderr: true, + tty: true, + expectError: false, + }, + } + for _, test := range tests { + conn := &fakeStreamCreator{ + created: make(map[string]bool), + errors: map[string]error{ + api.StreamTypeStdin: test.stdinError, + api.StreamTypeStdout: test.stdoutError, + api.StreamTypeStderr: test.stderrError, + api.StreamTypeError: test.errorError, + }, + } + + opts := StreamOptions{Tty: test.tty} + if test.stdin { + opts.Stdin = &fakeReader{} + } + if test.stdout { + opts.Stdout = &fakeWriter{} + } + if test.stderr { + opts.Stderr = &fakeWriter{} + } + + h := newStreamProtocolV2(opts).(*streamProtocolV2) + err := h.createStreams(conn) + + if test.expectError { + if err == nil { + t.Errorf("%s: expected error", test.name) + continue + } + if e, a := test.stdinError, err; test.stdinError != nil && e != a { + t.Errorf("%s: expected %v, got %v", test.name, e, a) + } + if e, a := test.stdoutError, err; test.stdoutError != nil && e != a { + t.Errorf("%s: expected %v, got %v", test.name, e, a) + } + if e, a := test.stderrError, err; test.stderrError != nil && e != a { + t.Errorf("%s: expected %v, got %v", test.name, e, a) + } + if e, a := test.errorError, err; test.errorError != nil && e != a { + t.Errorf("%s: expected %v, got %v", test.name, e, a) + } + continue + } + + if !test.expectError && err != nil { + t.Errorf("%s: unexpected error: %v", test.name, err) + continue + } + + if test.stdin && !conn.created[api.StreamTypeStdin] { + t.Errorf("%s: expected stdin stream", test.name) + } + if test.stdout && !conn.created[api.StreamTypeStdout] { + t.Errorf("%s: expected stdout stream", test.name) + } + if test.stderr { + if test.tty && conn.created[api.StreamTypeStderr] { + t.Errorf("%s: unexpected stderr stream because tty is set", test.name) + } else if !test.tty && !conn.created[api.StreamTypeStderr] { + t.Errorf("%s: expected stderr stream", test.name) + } + } + if !conn.created[api.StreamTypeError] { + t.Errorf("%s: expected error stream", test.name) + } + + } +} + +func TestV2ErrorStreamReading(t *testing.T) { + tests := []struct { + name string + stream io.Reader + expectedError error + }{ + { + name: "error reading from stream", + stream: &fakeReader{errors.New("foo")}, + expectedError: errors.New("error reading from error stream: foo"), + }, + { + name: "stream returns an error", + stream: strings.NewReader("some error"), + expectedError: errors.New("error executing remote command: some error"), + }, + } + + for _, test := range tests { + h := newStreamProtocolV2(StreamOptions{}).(*streamProtocolV2) + h.errorStream = test.stream + + ch := h.setupErrorStreamReading() + if ch == nil { + t.Fatalf("%s: unexpected nil channel", test.name) + } + + var err error + select { + case err = <-ch: + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("%s: timed out", test.name) + } + + if test.expectedError != nil { + if err == nil { + t.Errorf("%s: expected an error", test.name) + } else if e, a := test.expectedError, err; e.Error() != a.Error() { + t.Errorf("%s: expected %q, got %q", test.name, e, a) + } + continue + } + + if test.expectedError == nil && err != nil { + t.Errorf("%s: unexpected error: %v", test.name, err) + continue + } + } +} diff --git a/pkg/client/unversioned/remotecommand/v3.go b/pkg/client/unversioned/remotecommand/v3.go new file mode 100644 index 0000000000..7b46f940ef --- /dev/null +++ b/pkg/client/unversioned/remotecommand/v3.go @@ -0,0 +1,108 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remotecommand + +import ( + "encoding/json" + "io" + "net/http" + "sync" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/runtime" +) + +// streamProtocolV3 implements version 3 of the streaming protocol for attach +// and exec. This version adds support for resizing the container's terminal. +type streamProtocolV3 struct { + *streamProtocolV2 + + resizeStream io.Writer +} + +var _ streamProtocolHandler = &streamProtocolV3{} + +func newStreamProtocolV3(options StreamOptions) streamProtocolHandler { + return &streamProtocolV3{ + streamProtocolV2: newStreamProtocolV2(options).(*streamProtocolV2), + } +} + +func (p *streamProtocolV3) createStreams(conn streamCreator) error { + // set up the streams from v2 + if err := p.streamProtocolV2.createStreams(conn); err != nil { + return err + } + + // set up resize stream + if p.Tty { + headers := http.Header{} + headers.Set(api.StreamType, api.StreamTypeResize) + var err error + p.resizeStream, err = conn.CreateStream(headers) + if err != nil { + return err + } + } + + return nil +} + +func (p *streamProtocolV3) handleResizes() { + if p.resizeStream == nil { + return + } + + go func() { + defer runtime.HandleCrash() + + encoder := json.NewEncoder(p.resizeStream) + for { + size := p.TerminalSizeQueue.Next() + if size == nil { + return + } + if err := encoder.Encode(&size); err != nil { + runtime.HandleError(err) + } + } + }() +} + +func (p *streamProtocolV3) stream(conn streamCreator) error { + if err := p.createStreams(conn); err != nil { + return err + } + + // now that all the streams have been created, proceed with reading & copying + + errorChan := p.setupErrorStreamReading() + + p.handleResizes() + + p.copyStdin() + + var wg sync.WaitGroup + p.copyStdout(&wg) + p.copyStderr(&wg) + + // we're waiting for stdout/stderr to finish copying + wg.Wait() + + // waits for errorStream to finish reading with an error or nil + return <-errorChan +} diff --git a/pkg/kubectl/cmd/attach.go b/pkg/kubectl/cmd/attach.go index 4fb6b1006d..2a2c76ed18 100644 --- a/pkg/kubectl/cmd/attach.go +++ b/pkg/kubectl/cmd/attach.go @@ -77,18 +77,25 @@ func NewCmdAttach(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer) // RemoteAttach defines the interface accepted by the Attach command - provided for test stubbing type RemoteAttach interface { - Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error + Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error } // DefaultRemoteAttach is the standard implementation of attaching type DefaultRemoteAttach struct{} -func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { +func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error { exec, err := remotecommand.NewExecutor(config, method, url) if err != nil { return err } - return exec.Stream(remotecommandserver.SupportedStreamingProtocols, stdin, stdout, stderr, tty) + return exec.Stream(remotecommand.StreamOptions{ + SupportedProtocols: remotecommandserver.SupportedStreamingProtocols, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, + TerminalSizeQueue: terminalSizeQueue, + }) } // AttachOptions declare the arguments accepted by the Exec command @@ -182,7 +189,10 @@ func (p *AttachOptions) Run() error { pod := p.Pod // ensure we can recover the terminal while attached - t := term.TTY{Parent: p.InterruptParent} + t := term.TTY{ + Parent: p.InterruptParent, + Out: p.Out, + } // check for TTY tty := p.TTY @@ -196,17 +206,41 @@ func (p *AttachOptions) Run() error { } if p.Stdin { t.In = p.In - if tty && !t.IsTerminal() { + if tty && !t.IsTerminalIn() { tty = false fmt.Fprintln(p.Err, "Unable to use a TTY - input is not a terminal or the right kind of file") } + } else { + p.In = nil } t.Raw = tty - fn := func() error { - if tty { - fmt.Fprintln(p.Out, "\nHit enter for command prompt") + // save p.Err so we can print the command prompt message below + stderr := p.Err + + var sizeQueue term.TerminalSizeQueue + if tty { + if size := t.GetSize(); size != nil { + // fake resizing +1 and then back to normal so that attach-detach-reattach will result in the + // screen being redrawn + sizePlusOne := *size + sizePlusOne.Width++ + sizePlusOne.Height++ + + // this call spawns a goroutine to monitor/update the terminal size + sizeQueue = t.MonitorSize(&sizePlusOne, size) } + + // unset p.Err if it was previously set because both stdout and stderr go over p.Out when tty is + // true + p.Err = nil + } + + fn := func() error { + if stderr != nil { + fmt.Fprintln(stderr, "If you don't see a command prompt, try pressing enter.") + } + // TODO: consider abstracting into a client invocation or client helper req := p.Client.RESTClient.Post(). Resource("pods"). @@ -215,13 +249,13 @@ func (p *AttachOptions) Run() error { SubResource("attach") req.VersionedParams(&api.PodAttachOptions{ Container: containerToAttach.Name, - Stdin: p.In != nil, + Stdin: p.Stdin, Stdout: p.Out != nil, Stderr: p.Err != nil, TTY: tty, }, api.ParameterCodec) - return p.Attach.Attach("POST", req.URL(), p.Config, p.In, p.Out, p.Err, tty) + return p.Attach.Attach("POST", req.URL(), p.Config, p.In, p.Out, p.Err, tty, sizeQueue) } if err := t.Safe(fn); err != nil { diff --git a/pkg/kubectl/cmd/attach_test.go b/pkg/kubectl/cmd/attach_test.go index 9ebc756b3c..439ccf2c11 100644 --- a/pkg/kubectl/cmd/attach_test.go +++ b/pkg/kubectl/cmd/attach_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/unversioned/fake" + "k8s.io/kubernetes/pkg/util/term" ) type fakeRemoteAttach struct { @@ -40,7 +41,7 @@ type fakeRemoteAttach struct { err error } -func (f *fakeRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { +func (f *fakeRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error { f.method = method f.url = url return f.err diff --git a/pkg/kubectl/cmd/exec.go b/pkg/kubectl/cmd/exec.go index 83f9cea789..697b59595e 100644 --- a/pkg/kubectl/cmd/exec.go +++ b/pkg/kubectl/cmd/exec.go @@ -20,11 +20,7 @@ import ( "fmt" "io" "net/url" - "os" - "os/signal" - "syscall" - "github.com/docker/docker/pkg/term" "github.com/golang/glog" "github.com/renstrom/dedent" "github.com/spf13/cobra" @@ -34,6 +30,8 @@ import ( "k8s.io/kubernetes/pkg/client/unversioned/remotecommand" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" + "k8s.io/kubernetes/pkg/util/interrupt" + "k8s.io/kubernetes/pkg/util/term" ) var ( @@ -79,18 +77,25 @@ func NewCmdExec(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer) * // RemoteExecutor defines the interface accepted by the Exec command - provided for test stubbing type RemoteExecutor interface { - Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error + Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error } // DefaultRemoteExecutor is the standard implementation of remote command execution type DefaultRemoteExecutor struct{} -func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { +func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error { exec, err := remotecommand.NewExecutor(config, method, url) if err != nil { return err } - return exec.Stream(remotecommandserver.SupportedStreamingProtocols, stdin, stdout, stderr, tty) + return exec.Stream(remotecommand.StreamOptions{ + SupportedProtocols: remotecommandserver.SupportedStreamingProtocols, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, + TerminalSizeQueue: terminalSizeQueue, + }) } // ExecOptions declare the arguments accepted by the Exec command @@ -102,6 +107,9 @@ type ExecOptions struct { TTY bool Command []string + // InterruptParent, if set, is used to handle interrupts while attached + InterruptParent *interrupt.Handler + In io.Reader Out io.Writer Err io.Writer @@ -186,58 +194,58 @@ func (p *ExecOptions) Run() error { containerName = pod.Spec.Containers[0].Name } - // TODO: refactor with terminal helpers from the edit utility once that is merged - var stdin io.Reader - tty := p.TTY - if p.Stdin { - stdin = p.In - if tty { - if file, ok := stdin.(*os.File); ok { - inFd := file.Fd() - if term.IsTerminal(inFd) { - oldState, err := term.SetRawTerminal(inFd) - if err != nil { - glog.Fatal(err) - } - // this handles a clean exit, where the command finished - defer term.RestoreTerminal(inFd, oldState) - - // SIGINT is handled by term.SetRawTerminal (it runs a goroutine that listens - // for SIGINT and restores the terminal before exiting) - - // this handles SIGTERM - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGTERM) - go func() { - <-sigChan - term.RestoreTerminal(inFd, oldState) - os.Exit(0) - }() - } else { - fmt.Fprintln(p.Err, "STDIN is not a terminal") - } - } else { - tty = false - fmt.Fprintln(p.Err, "Unable to use a TTY - input is not the right kind of file") - } - } + // ensure we can recover the terminal while attached + t := term.TTY{ + Parent: p.InterruptParent, + Out: p.Out, } - // TODO: consider abstracting into a client invocation or client helper - req := p.Client.RESTClient.Post(). - Resource("pods"). - Name(pod.Name). - Namespace(pod.Namespace). - SubResource("exec"). - Param("container", containerName) - req.VersionedParams(&api.PodExecOptions{ - Container: containerName, - Command: p.Command, - Stdin: stdin != nil, - Stdout: p.Out != nil, - Stderr: p.Err != nil, - TTY: tty, - }, api.ParameterCodec) + // check for TTY + tty := p.TTY + if p.Stdin { + t.In = p.In + if tty && !t.IsTerminalIn() { + tty = false + fmt.Fprintln(p.Err, "Unable to use a TTY - input is not a terminal or the right kind of file") + } + } else { + p.In = nil + } + t.Raw = tty - return p.Executor.Execute("POST", req.URL(), p.Config, stdin, p.Out, p.Err, tty) + var sizeQueue term.TerminalSizeQueue + if tty { + // this call spawns a goroutine to monitor/update the terminal size + sizeQueue = t.MonitorSize(t.GetSize()) + + // unset p.Err if it was previously set because both stdout and stderr go over p.Out when tty is + // true + p.Err = nil + } + + fn := func() error { + // TODO: consider abstracting into a client invocation or client helper + req := p.Client.RESTClient.Post(). + Resource("pods"). + Name(pod.Name). + Namespace(pod.Namespace). + SubResource("exec"). + Param("container", containerName) + req.VersionedParams(&api.PodExecOptions{ + Container: containerName, + Command: p.Command, + Stdin: p.Stdin, + Stdout: p.Out != nil, + Stderr: p.Err != nil, + TTY: tty, + }, api.ParameterCodec) + + return p.Executor.Execute("POST", req.URL(), p.Config, p.In, p.Out, p.Err, tty, sizeQueue) + } + + if err := t.Safe(fn); err != nil { + return err + } + + return nil } diff --git a/pkg/kubectl/cmd/exec_test.go b/pkg/kubectl/cmd/exec_test.go index 8a4ed3479f..8a0dc536b3 100644 --- a/pkg/kubectl/cmd/exec_test.go +++ b/pkg/kubectl/cmd/exec_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/unversioned/fake" + "k8s.io/kubernetes/pkg/util/term" ) type fakeRemoteExecutor struct { @@ -40,7 +41,7 @@ type fakeRemoteExecutor struct { execErr error } -func (f *fakeRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { +func (f *fakeRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error { f.method = method f.url = url return f.execErr diff --git a/pkg/kubelet/container/resize.go b/pkg/kubelet/container/resize.go new file mode 100644 index 0000000000..1686031777 --- /dev/null +++ b/pkg/kubelet/container/resize.go @@ -0,0 +1,46 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/term" +) + +// handleResizing spawns a goroutine that processes the resize channel, calling resizeFunc for each +// term.Size received from the channel. The resize channel must be closed elsewhere to stop the +// goroutine. +func HandleResizing(resize <-chan term.Size, resizeFunc func(size term.Size)) { + if resize == nil { + return + } + + go func() { + defer runtime.HandleCrash() + + for { + size, ok := <-resize + if !ok { + return + } + if size.Height < 1 || size.Width < 1 { + continue + } + resizeFunc(size) + } + }() +} diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index df06a737b7..1f0bd1f57e 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/flowcontrol" + "k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/volume" ) @@ -126,7 +127,7 @@ type Runtime interface { } type ContainerAttacher interface { - AttachContainer(id ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) (err error) + AttachContainer(id ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) } // CommandRunner encapsulates the command runner interfaces for testability. @@ -134,7 +135,7 @@ type ContainerCommandRunner interface { // Runs the command in the container of the specified pod using nsenter. // Attaches the processes stdin, stdout, and stderr. Optionally uses a // tty. - ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error + ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error // Forward the specified port from the specified pod to the stream. PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error } diff --git a/pkg/kubelet/container/testing/fake_runtime.go b/pkg/kubelet/container/testing/fake_runtime.go index 3612f555bf..36cc00b142 100644 --- a/pkg/kubelet/container/testing/fake_runtime.go +++ b/pkg/kubelet/container/testing/fake_runtime.go @@ -27,6 +27,7 @@ import ( . "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/flowcontrol" + "k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/volume" ) @@ -273,7 +274,7 @@ func (f *FakeRuntime) GetPodStatus(uid types.UID, name, namespace string) (*PodS return &status, f.Err } -func (f *FakeRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (f *FakeRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { f.Lock() defer f.Unlock() @@ -281,7 +282,7 @@ func (f *FakeRuntime) ExecInContainer(containerID ContainerID, cmd []string, std return f.Err } -func (f *FakeRuntime) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (f *FakeRuntime) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { f.Lock() defer f.Unlock() diff --git a/pkg/kubelet/container/testing/runtime_mock.go b/pkg/kubelet/container/testing/runtime_mock.go index 43dd2a9573..a50db35984 100644 --- a/pkg/kubelet/container/testing/runtime_mock.go +++ b/pkg/kubelet/container/testing/runtime_mock.go @@ -24,6 +24,7 @@ import ( . "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/flowcontrol" + "k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/volume" ) @@ -88,12 +89,12 @@ func (r *Mock) GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, return args.Get(0).(*PodStatus), args.Error(1) } -func (r *Mock) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (r *Mock) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { args := r.Called(containerID, cmd, stdin, stdout, stderr, tty) return args.Error(0) } -func (r *Mock) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (r *Mock) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { args := r.Called(containerID, stdin, stdout, stderr, tty) return args.Error(0) } diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index 0eb4631704..5ec2f6be4a 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -77,6 +77,8 @@ type DockerInterface interface { StartExec(string, dockertypes.ExecStartCheck, StreamOptions) error InspectExec(id string) (*dockertypes.ContainerExecInspect, error) AttachToContainer(string, dockertypes.ContainerAttachOptions, StreamOptions) error + ResizeContainerTTY(id string, height, width int) error + ResizeExecTTY(id string, height, width int) error } // KubeletContainerName encapsulates a pod name and a Kubernetes container name. diff --git a/pkg/kubelet/dockertools/docker_manager.go b/pkg/kubelet/dockertools/docker_manager.go index 65287de058..dc4c582f39 100644 --- a/pkg/kubelet/dockertools/docker_manager.go +++ b/pkg/kubelet/dockertools/docker_manager.go @@ -63,6 +63,7 @@ import ( utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" utilstrings "k8s.io/kubernetes/pkg/util/strings" + "k8s.io/kubernetes/pkg/util/term" ) const ( @@ -1098,7 +1099,7 @@ func (d *dockerExitError) ExitStatus() int { } // ExecInContainer runs the command inside the container identified by containerID. -func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { if dm.execHandler == nil { return errors.New("unable to exec without an exec handler") } @@ -1111,10 +1112,16 @@ func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID, return fmt.Errorf("container not running (%s)", container.ID) } - return dm.execHandler.ExecInContainer(dm.client, container, cmd, stdin, stdout, stderr, tty) + return dm.execHandler.ExecInContainer(dm.client, container, cmd, stdin, stdout, stderr, tty, resize) } -func (dm *DockerManager) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (dm *DockerManager) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { + // Have to start this before the call to client.AttachToContainer because client.AttachToContainer is a blocking + // call :-( Otherwise, resize events don't get processed and the terminal never resizes. + kubecontainer.HandleResizing(resize, func(size term.Size) { + dm.client.ResizeContainerTTY(containerID.ID, int(size.Height), int(size.Width)) + }) + // TODO(random-liu): Do we really use the *Logs* field here? opts := dockertypes.ContainerAttachOptions{ Stream: true, diff --git a/pkg/kubelet/dockertools/exec.go b/pkg/kubelet/dockertools/exec.go index b836652888..ce213650b0 100644 --- a/pkg/kubelet/dockertools/exec.go +++ b/pkg/kubelet/dockertools/exec.go @@ -26,18 +26,19 @@ import ( dockertypes "github.com/docker/engine-api/types" "github.com/golang/glog" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/util/term" ) // ExecHandler knows how to execute a command in a running Docker container. type ExecHandler interface { - ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error + ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error } // NsenterExecHandler executes commands in Docker containers using nsenter. type NsenterExecHandler struct{} // TODO should we support nsenter in a container, running with elevated privs and --pid=host? -func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { nsenter, err := exec.LookPath("nsenter") if err != nil { return fmt.Errorf("exec unavailable - unable to locate nsenter") @@ -61,6 +62,10 @@ func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *do // make sure to close the stdout stream defer stdout.Close() + kubecontainer.HandleResizing(resize, func(size term.Size) { + term.SetSize(p.Fd(), size) + }) + if stdin != nil { go io.Copy(p, stdin) } @@ -98,7 +103,7 @@ func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *do // NativeExecHandler executes commands in Docker containers using Docker's exec API. type NativeExecHandler struct{} -func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { createOpts := dockertypes.ExecConfig{ Cmd: cmd, AttachStdin: stdin != nil, @@ -110,6 +115,13 @@ func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *doc if err != nil { return fmt.Errorf("failed to exec in container - Exec setup failed - %v", err) } + + // Have to start this before the call to client.StartExec because client.StartExec is a blocking + // call :-( Otherwise, resize events don't get processed and the terminal never resizes. + kubecontainer.HandleResizing(resize, func(size term.Size) { + client.ResizeExecTTY(execObj.ID, int(size.Height), int(size.Width)) + }) + startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty} streamOpts := StreamOptions{ InputStream: stdin, @@ -121,6 +133,7 @@ func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *doc if err != nil { return err } + ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() count := 0 diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index a9d3c2249b..c6246665ac 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -500,6 +500,20 @@ func (f *FakeDockerClient) updateContainerStatus(id, status string) { } } +func (f *FakeDockerClient) ResizeExecTTY(id string, height, width int) error { + f.Lock() + defer f.Unlock() + f.called = append(f.called, "resize_exec") + return nil +} + +func (f *FakeDockerClient) ResizeContainerTTY(id string, height, width int) error { + f.Lock() + defer f.Unlock() + f.called = append(f.called, "resize_container") + return nil +} + // FakeDockerPuller is a stub implementation of DockerPuller. type FakeDockerPuller struct { sync.Mutex diff --git a/pkg/kubelet/dockertools/instrumented_docker.go b/pkg/kubelet/dockertools/instrumented_docker.go index 3a6e85493b..8503fe2d30 100644 --- a/pkg/kubelet/dockertools/instrumented_docker.go +++ b/pkg/kubelet/dockertools/instrumented_docker.go @@ -213,3 +213,21 @@ func (in instrumentedDockerInterface) ImageHistory(id string) ([]dockertypes.Ima recordError(operation, err) return out, err } + +func (in instrumentedDockerInterface) ResizeExecTTY(id string, height, width int) error { + const operation = "resize_exec" + defer recordOperation(operation, time.Now()) + + err := in.client.ResizeExecTTY(id, height, width) + recordError(operation, err) + return err +} + +func (in instrumentedDockerInterface) ResizeContainerTTY(id string, height, width int) error { + const operation = "resize_container" + defer recordOperation(operation, time.Now()) + + err := in.client.ResizeContainerTTY(id, height, width) + recordError(operation, err) + return err +} diff --git a/pkg/kubelet/dockertools/kube_docker_client.go b/pkg/kubelet/dockertools/kube_docker_client.go index d66fc380cb..9b31b4b6e5 100644 --- a/pkg/kubelet/dockertools/kube_docker_client.go +++ b/pkg/kubelet/dockertools/kube_docker_client.go @@ -454,6 +454,24 @@ func (d *kubeDockerClient) AttachToContainer(id string, opts dockertypes.Contain return d.holdHijackedConnection(sopts.RawTerminal, sopts.InputStream, sopts.OutputStream, sopts.ErrorStream, resp) } +func (d *kubeDockerClient) ResizeExecTTY(id string, height, width int) error { + ctx, cancel := d.getCancelableContext() + defer cancel() + return d.client.ContainerExecResize(ctx, id, dockertypes.ResizeOptions{ + Height: height, + Width: width, + }) +} + +func (d *kubeDockerClient) ResizeContainerTTY(id string, height, width int) error { + ctx, cancel := d.getCancelableContext() + defer cancel() + return d.client.ContainerResize(ctx, id, dockertypes.ResizeOptions{ + Height: height, + Width: width, + }) +} + // redirectResponseToOutputStream redirect the response stream to stdout and stderr. When tty is true, all stream will // only be redirected to stdout. func (d *kubeDockerClient) redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, resp io.Reader) error { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d6f79a5d9c..29a8598c29 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -88,6 +88,7 @@ import ( utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/selinux" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/term" utilvalidation "k8s.io/kubernetes/pkg/util/validation" "k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/util/wait" @@ -3777,7 +3778,7 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe var buffer bytes.Buffer output := ioutils.WriteCloserWrapper(&buffer) - err = kl.runner.ExecInContainer(container.ID, cmd, nil, output, output, false) + err = kl.runner.ExecInContainer(container.ID, cmd, nil, output, output, false, nil) if err != nil { return nil, err } @@ -3787,7 +3788,7 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe // ExecInContainer executes a command in a container, connecting the supplied // stdin/stdout/stderr to the command's IO streams. -func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { podUID = kl.podManager.TranslatePodUID(podUID) container, err := kl.findContainer(podFullName, podUID, containerName) @@ -3797,12 +3798,12 @@ func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, contain if container == nil { return fmt.Errorf("container not found (%q)", containerName) } - return kl.runner.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty) + return kl.runner.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty, resize) } // AttachContainer uses the container runtime to attach the given streams to // the given container. -func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { podUID = kl.podManager.TranslatePodUID(podUID) container, err := kl.findContainer(podFullName, podUID, containerName) @@ -3812,7 +3813,7 @@ func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, contain if container == nil { return fmt.Errorf("container not found (%q)", containerName) } - return kl.containerRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty) + return kl.containerRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty, resize) } // PortForward connects to the pod's port and copies data between the port diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 27642a6b36..8caee89dd4 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -74,6 +74,7 @@ import ( "k8s.io/kubernetes/pkg/util/rand" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume" @@ -1079,7 +1080,7 @@ type fakeContainerCommandRunner struct { Stream io.ReadWriteCloser } -func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error { +func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error { f.Cmd = cmd f.ID = id f.Stdin = in @@ -2096,6 +2097,7 @@ func TestExecInContainerNoSuchPod(t *testing.T) { nil, nil, false, + nil, ) if err == nil { t.Fatal("unexpected non-error") @@ -2140,6 +2142,7 @@ func TestExecInContainerNoSuchContainer(t *testing.T) { nil, nil, false, + nil, ) if err == nil { t.Fatal("unexpected non-error") @@ -2200,6 +2203,7 @@ func TestExecInContainer(t *testing.T) { stdout, stderr, tty, + nil, ) if err != nil { t.Fatalf("unexpected error: %s", err) diff --git a/pkg/kubelet/lifecycle/handlers.go b/pkg/kubelet/lifecycle/handlers.go index f3894659f0..6ba3effc6e 100644 --- a/pkg/kubelet/lifecycle/handlers.go +++ b/pkg/kubelet/lifecycle/handlers.go @@ -60,7 +60,7 @@ func (hr *HandlerRunner) Run(containerID kubecontainer.ContainerID, pod *api.Pod msg string ) output := ioutils.WriteCloserWrapper(&buffer) - err := hr.commandRunner.ExecInContainer(containerID, handler.Exec.Command, nil, output, output, false) + err := hr.commandRunner.ExecInContainer(containerID, handler.Exec.Command, nil, output, output, false, nil) if err != nil { msg := fmt.Sprintf("Exec lifecycle hook (%v) for Container %q in Pod %q failed - %q", handler.Exec.Command, container.Name, format.Pod(pod), buffer.String()) glog.V(1).Infof(msg) diff --git a/pkg/kubelet/lifecycle/handlers_test.go b/pkg/kubelet/lifecycle/handlers_test.go index 1bdd711421..74399a5369 100644 --- a/pkg/kubelet/lifecycle/handlers_test.go +++ b/pkg/kubelet/lifecycle/handlers_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/util/intstr" + "k8s.io/kubernetes/pkg/util/term" ) func TestResolvePortInt(t *testing.T) { @@ -80,7 +81,7 @@ type fakeContainerCommandRunner struct { ID kubecontainer.ContainerID } -func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error { +func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error { f.Cmd = cmd f.ID = id return nil diff --git a/pkg/kubelet/prober/prober.go b/pkg/kubelet/prober/prober.go index 19e02238c3..c307b2e0e5 100644 --- a/pkg/kubelet/prober/prober.go +++ b/pkg/kubelet/prober/prober.go @@ -228,7 +228,7 @@ func (p *prober) newExecInContainer(container api.Container, containerID kubecon return execInContainer{func() ([]byte, error) { var buffer bytes.Buffer output := ioutils.WriteCloserWrapper(&buffer) - err := p.runner.ExecInContainer(containerID, cmd, nil, output, output, false) + err := p.runner.ExecInContainer(containerID, cmd, nil, output, output, false, nil) if err != nil { return nil, err } diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index ff589c6a47..c6ac08b8f3 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -60,6 +60,7 @@ import ( "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/selinux" utilstrings "k8s.io/kubernetes/pkg/util/strings" + "k8s.io/kubernetes/pkg/util/term" utilwait "k8s.io/kubernetes/pkg/util/wait" ) @@ -2007,14 +2008,14 @@ func newRktExitError(e error) error { return e } -func (r *Runtime) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (r *Runtime) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { return fmt.Errorf("unimplemented") } // Note: In rkt, the container ID is in the form of "UUID:appName", where UUID is // the rkt UUID, and appName is the container name. // TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail. -func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { glog.V(4).Infof("Rkt execing in container.") id, err := parseContainerID(containerID) @@ -2035,6 +2036,10 @@ func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []s // make sure to close the stdout stream defer stdout.Close() + kubecontainer.HandleResizing(resize, func(size term.Size) { + term.SetSize(p.Fd(), size) + }) + if stdin != nil { go io.Copy(p, stdin) } diff --git a/pkg/kubelet/server/remotecommand/attach.go b/pkg/kubelet/server/remotecommand/attach.go index a7a12ea166..372ae54a21 100644 --- a/pkg/kubelet/server/remotecommand/attach.go +++ b/pkg/kubelet/server/remotecommand/attach.go @@ -25,13 +25,14 @@ import ( "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/term" ) // Attacher knows how to attach to a running container in a pod. type Attacher interface { // AttachContainer attaches to the running container in the pod, copying data between in/out/err // and the container's stdin/stdout/stderr. - AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error + AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error } // ServeAttach handles requests to attach to a container. After creating/receiving the required @@ -44,7 +45,7 @@ func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, po } defer ctx.conn.Close() - err := attacher.AttachContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty) + err := attacher.AttachContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan) if err != nil { msg := fmt.Sprintf("error attaching to container: %v", err) runtime.HandleError(errors.New(msg)) diff --git a/pkg/kubelet/server/remotecommand/contants.go b/pkg/kubelet/server/remotecommand/constants.go similarity index 77% rename from pkg/kubelet/server/remotecommand/contants.go rename to pkg/kubelet/server/remotecommand/constants.go index 1ecb072a11..33e0dce29a 100644 --- a/pkg/kubelet/server/remotecommand/contants.go +++ b/pkg/kubelet/server/remotecommand/constants.go @@ -31,6 +31,11 @@ const ( // attachment/execution. It is the second version of the subprotocol and // resolves the issues present in the first version. StreamProtocolV2Name = "v2.channel.k8s.io" + + // The SPDY subprotocol "v3.channel.k8s.io" is used for remote command + // attachment/execution. It is the third version of the subprotocol and + // adds support for resizing container terminals. + StreamProtocolV3Name = "v3.channel.k8s.io" ) -var SupportedStreamingProtocols = []string{StreamProtocolV2Name, StreamProtocolV1Name} +var SupportedStreamingProtocols = []string{StreamProtocolV3Name, StreamProtocolV2Name, StreamProtocolV1Name} diff --git a/pkg/kubelet/server/remotecommand/exec.go b/pkg/kubelet/server/remotecommand/exec.go index e77081f698..fcc4c230bf 100644 --- a/pkg/kubelet/server/remotecommand/exec.go +++ b/pkg/kubelet/server/remotecommand/exec.go @@ -26,13 +26,14 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/term" ) // Executor knows how to execute a command in a container in a pod. type Executor interface { // ExecInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. - ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error + ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error } // ServeExec handles requests to execute a command in a container. After @@ -48,7 +49,7 @@ func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podN cmd := req.URL.Query()[api.ExecCommandParamm] - err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty) + err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan) if err != nil { msg := fmt.Sprintf("error executing command in container: %v", err) runtime.HandleError(errors.New(msg)) diff --git a/pkg/kubelet/server/remotecommand/httpstream.go b/pkg/kubelet/server/remotecommand/httpstream.go index f42b07e611..ccd0c3f007 100644 --- a/pkg/kubelet/server/remotecommand/httpstream.go +++ b/pkg/kubelet/server/remotecommand/httpstream.go @@ -17,6 +17,7 @@ limitations under the License. package remotecommand import ( + "encoding/json" "errors" "fmt" "io" @@ -27,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream/spdy" "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/util/wsstream" "github.com/golang/glog" @@ -87,6 +89,8 @@ type context struct { stdoutStream io.WriteCloser stderrStream io.WriteCloser errorStream io.WriteCloser + resizeStream io.ReadCloser + resizeChan chan term.Size tty bool } @@ -118,10 +122,26 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt return nil, false } + var ctx *context + var ok bool if wsstream.IsWebSocketRequest(req) { - return createWebSocketStreams(req, w, opts, idleTimeout) + ctx, ok = createWebSocketStreams(req, w, opts, idleTimeout) + } else { + ctx, ok = createHttpStreamStreams(req, w, opts, supportedStreamProtocols, idleTimeout, streamCreationTimeout) + } + if !ok { + return nil, false } + if ctx.resizeStream != nil { + ctx.resizeChan = make(chan term.Size) + go handleResizeEvents(ctx.resizeStream, ctx.resizeChan) + } + + return ctx, true +} + +func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) { protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols) if err != nil { w.WriteHeader(http.StatusBadRequest) @@ -148,6 +168,8 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt var handler protocolHandler switch protocol { + case StreamProtocolV3Name: + handler = &v3ProtocolHandler{} case StreamProtocolV2Name: handler = &v2ProtocolHandler{} case "": @@ -157,6 +179,10 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt handler = &v1ProtocolHandler{} } + if opts.tty && handler.supportsTerminalResizing() { + opts.expectedStreams++ + } + expired := time.NewTimer(streamCreationTimeout) ctx, err := handler.waitForStreams(streamCh, opts.expectedStreams, expired.C) @@ -167,6 +193,7 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt ctx.conn = conn ctx.tty = opts.tty + return ctx, true } @@ -174,8 +201,61 @@ type protocolHandler interface { // waitForStreams waits for the expected streams or a timeout, returning a // remoteCommandContext if all the streams were received, or an error if not. waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) + // supportsTerminalResizing returns true if the protocol handler supports terminal resizing + supportsTerminalResizing() bool } +// v3ProtocolHandler implements the V3 protocol version for streaming command execution. +type v3ProtocolHandler struct{} + +func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) { + ctx := &context{} + receivedStreams := 0 + replyChan := make(chan struct{}) + stop := make(chan struct{}) + defer close(stop) +WaitForStreams: + for { + select { + case stream := <-streams: + streamType := stream.Headers().Get(api.StreamType) + switch streamType { + case api.StreamTypeError: + ctx.errorStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + case api.StreamTypeStdin: + ctx.stdinStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + case api.StreamTypeStdout: + ctx.stdoutStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + case api.StreamTypeStderr: + ctx.stderrStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + case api.StreamTypeResize: + ctx.resizeStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + default: + runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType)) + } + case <-replyChan: + receivedStreams++ + if receivedStreams == expectedStreams { + break WaitForStreams + } + case <-expired: + // TODO find a way to return the error to the user. Maybe use a separate + // stream to report errors? + return nil, errors.New("timed out waiting for client to create streams") + } + } + + return ctx, nil +} + +// supportsTerminalResizing returns true because v3ProtocolHandler supports it +func (*v3ProtocolHandler) supportsTerminalResizing() bool { return true } + // v2ProtocolHandler implements the V2 protocol version for streaming command execution. type v2ProtocolHandler struct{} @@ -221,6 +301,9 @@ WaitForStreams: return ctx, nil } +// supportsTerminalResizing returns false because v2ProtocolHandler doesn't support it. +func (*v2ProtocolHandler) supportsTerminalResizing() bool { return false } + // v1ProtocolHandler implements the V1 protocol version for streaming command execution. type v1ProtocolHandler struct{} @@ -275,3 +358,19 @@ WaitForStreams: return ctx, nil } + +// supportsTerminalResizing returns false because v1ProtocolHandler doesn't support it. +func (*v1ProtocolHandler) supportsTerminalResizing() bool { return false } + +func handleResizeEvents(stream io.Reader, channel chan<- term.Size) { + defer runtime.HandleCrash() + + decoder := json.NewDecoder(stream) + for { + size := term.Size{} + if err := decoder.Decode(&size); err != nil { + break + } + channel <- size + } +} diff --git a/pkg/kubelet/server/remotecommand/websocket.go b/pkg/kubelet/server/remotecommand/websocket.go index d9a0665179..efeb1b6646 100644 --- a/pkg/kubelet/server/remotecommand/websocket.go +++ b/pkg/kubelet/server/remotecommand/websocket.go @@ -17,61 +17,82 @@ limitations under the License. package remotecommand import ( + "fmt" "net/http" "time" "k8s.io/kubernetes/pkg/httplog" + "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wsstream" - - "github.com/golang/glog" ) -// standardShellChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2) -// along with the approximate duplex value. Supported subprotocols are "channel.k8s.io" and -// "base64.channel.k8s.io". -func standardShellChannels(stdin, stdout, stderr bool) []wsstream.ChannelType { - // open three half-duplex channels - channels := []wsstream.ChannelType{wsstream.ReadChannel, wsstream.WriteChannel, wsstream.WriteChannel} - if !stdin { - channels[0] = wsstream.IgnoreChannel - } - if !stdout { - channels[1] = wsstream.IgnoreChannel - } - if !stderr { - channels[2] = wsstream.IgnoreChannel - } +const ( + stdinChannel = iota + stdoutChannel + stderrChannel + errorChannel + resizeChannel +) + +// createChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2) +// along with the approximate duplex value. It also creates the error (3) and resize (4) channels. +func createChannels(opts *options) []wsstream.ChannelType { + // open the requested channels, and always open the error channel + channels := make([]wsstream.ChannelType, 5) + channels[stdinChannel] = readChannel(opts.stdin) + channels[stdoutChannel] = writeChannel(opts.stdout) + channels[stderrChannel] = writeChannel(opts.stderr) + channels[errorChannel] = wsstream.WriteChannel + channels[resizeChannel] = wsstream.ReadChannel return channels } -// createWebSocketStreams returns a remoteCommandContext containing the websocket connection and +// readChannel returns wsstream.ReadChannel if real is true, or wsstream.IgnoreChannel. +func readChannel(real bool) wsstream.ChannelType { + if real { + return wsstream.ReadChannel + } + return wsstream.IgnoreChannel +} + +// writeChannel returns wsstream.WriteChannel if real is true, or wsstream.IgnoreChannel. +func writeChannel(real bool) wsstream.ChannelType { + if real { + return wsstream.WriteChannel + } + return wsstream.IgnoreChannel +} + +// createWebSocketStreams returns a context containing the websocket connection and // streams needed to perform an exec or an attach. func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *options, idleTimeout time.Duration) (*context, bool) { - // open the requested channels, and always open the error channel - channels := append(standardShellChannels(opts.stdin, opts.stdout, opts.stderr), wsstream.WriteChannel) + channels := createChannels(opts) conn := wsstream.NewConn(channels...) conn.SetIdleTimeout(idleTimeout) streams, err := conn.Open(httplog.Unlogged(w), req) if err != nil { - glog.Errorf("Unable to upgrade websocket connection: %v", err) + runtime.HandleError(fmt.Errorf("Unable to upgrade websocket connection: %v", err)) return nil, false } + // Send an empty message to the lowest writable channel to notify the client the connection is established // TODO: make generic to SPDY and WebSockets and do it outside of this method? switch { case opts.stdout: - streams[1].Write([]byte{}) + streams[stdoutChannel].Write([]byte{}) case opts.stderr: - streams[2].Write([]byte{}) + streams[stderrChannel].Write([]byte{}) default: - streams[3].Write([]byte{}) + streams[errorChannel].Write([]byte{}) } + return &context{ conn: conn, - stdinStream: streams[0], - stdoutStream: streams[1], - stderrStream: streams[2], - errorStream: streams[3], + stdinStream: streams[stdinChannel], + stdoutStream: streams[stdoutChannel], + stderrStream: streams[stderrChannel], + errorStream: streams[errorChannel], tty: opts.tty, + resizeStream: streams[resizeChannel], }, true } diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index bd24e94338..cfb3600b5b 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -58,6 +58,7 @@ import ( "k8s.io/kubernetes/pkg/util/httpstream/spdy" "k8s.io/kubernetes/pkg/util/limitwriter" utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/volume" ) @@ -160,8 +161,8 @@ type HostInterface interface { GetRunningPods() ([]*api.Pod, error) GetPodByName(namespace, name string) (*api.Pod, bool) RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error) - ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error - AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error + ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error + AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error GetKubeletContainerLogs(podFullName, containerName string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error ServeLogs(w http.ResponseWriter, req *http.Request) PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index 3d38931ac9..1bc04481f5 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -48,6 +48,7 @@ import ( "k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream/spdy" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/volume" ) @@ -119,11 +120,11 @@ func (fk *fakeKubelet) RunInContainer(podFullName string, uid types.UID, contain return fk.runFunc(podFullName, uid, containerName, cmd) } -func (fk *fakeKubelet) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error { +func (fk *fakeKubelet) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error { return fk.execFunc(name, uid, container, cmd, in, out, err, tty) } -func (fk *fakeKubelet) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error { +func (fk *fakeKubelet) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error { return fk.attachFunc(name, uid, container, in, out, err, tty) } diff --git a/pkg/util/term/resize.go b/pkg/util/term/resize.go new file mode 100644 index 0000000000..155c840d6a --- /dev/null +++ b/pkg/util/term/resize.go @@ -0,0 +1,147 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package term + +import ( + "fmt" + + "github.com/docker/docker/pkg/term" + "k8s.io/kubernetes/pkg/util/runtime" +) + +// Size represents the width and height of a terminal. +type Size struct { + Width uint16 + Height uint16 +} + +// GetSize returns the current size of the user's terminal. If it isn't a terminal, +// nil is returned. +func (t TTY) GetSize() *Size { + if !t.IsTerminalOut() { + return nil + } + return GetSize(t.Out.(fd).Fd()) +} + +// GetSize returns the current size of the terminal associated with fd. +func GetSize(fd uintptr) *Size { + winsize, err := term.GetWinsize(fd) + if err != nil { + runtime.HandleError(fmt.Errorf("unable to get terminal size: %v", err)) + return nil + } + + return &Size{Width: winsize.Width, Height: winsize.Height} +} + +// SetSize sets the terminal size associated with fd. +func SetSize(fd uintptr, size Size) error { + return term.SetWinsize(fd, &term.Winsize{Height: size.Height, Width: size.Width}) +} + +// MonitorSize monitors the terminal's size. It returns a TerminalSizeQueue primed with +// initialSizes, or nil if there's no TTY present. +func (t *TTY) MonitorSize(initialSizes ...*Size) TerminalSizeQueue { + if !t.IsTerminalOut() { + return nil + } + + t.sizeQueue = &sizeQueue{ + t: *t, + // make it buffered so we can send the initial terminal sizes without blocking, prior to starting + // the streaming below + resizeChan: make(chan Size, len(initialSizes)), + stopResizing: make(chan struct{}), + } + + t.sizeQueue.monitorSize(initialSizes...) + + return t.sizeQueue +} + +// TerminalSizeQueue is capable of returning terminal resize events as they occur. +type TerminalSizeQueue interface { + // Next returns the new terminal size after the terminal has been resized. It returns nil when + // monitoring has been stopped. + Next() *Size +} + +// sizeQueue implements TerminalSizeQueue +type sizeQueue struct { + t TTY + // resizeChan receives a Size each time the user's terminal is resized. + resizeChan chan Size + stopResizing chan struct{} +} + +// make sure sizeQueue implements the TerminalSizeQueue interface +var _ TerminalSizeQueue = &sizeQueue{} + +// monitorSize primes resizeChan with initialSizes and then monitors for resize events. With each +// new event, it sends the current terminal size to resizeChan. +func (s *sizeQueue) monitorSize(initialSizes ...*Size) { + // send the initial sizes + for i := range initialSizes { + if initialSizes[i] != nil { + s.resizeChan <- *initialSizes[i] + } + } + + resizeEvents := make(chan Size, 1) + + monitorResizeEvents(s.t.Out.(fd).Fd(), resizeEvents, s.stopResizing) + + // listen for resize events in the background + go func() { + defer runtime.HandleCrash() + + for { + select { + case size, ok := <-resizeEvents: + if !ok { + return + } + + select { + // try to send the size to resizeChan, but don't block + case s.resizeChan <- size: + // send successful + default: + // unable to send / no-op + } + case <-s.stopResizing: + return + } + } + }() +} + +// Next returns the new terminal size after the terminal has been resized. It returns nil when +// monitoring has been stopped. +func (s *sizeQueue) Next() *Size { + size, ok := <-s.resizeChan + if !ok { + return nil + } + return &size +} + +// stop stops the background goroutine that is monitoring for terminal resizes. +func (s *sizeQueue) stop() { + close(s.stopResizing) +} diff --git a/pkg/util/term/resizeevents.go b/pkg/util/term/resizeevents.go new file mode 100644 index 0000000000..70858ed03f --- /dev/null +++ b/pkg/util/term/resizeevents.go @@ -0,0 +1,60 @@ +// +build !windows + +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package term + +import ( + "os" + "os/signal" + "syscall" + + "k8s.io/kubernetes/pkg/util/runtime" +) + +// monitorResizeEvents spawns a goroutine that waits for SIGWINCH signals (these indicate the +// terminal has resized). After receiving a SIGWINCH, this gets the terminal size and tries to send +// it to the resizeEvents channel. The goroutine stops when the stop channel is closed. +func monitorResizeEvents(fd uintptr, resizeEvents chan<- Size, stop chan struct{}) { + go func() { + defer runtime.HandleCrash() + + winch := make(chan os.Signal, 1) + signal.Notify(winch, syscall.SIGWINCH) + defer signal.Stop(winch) + + for { + select { + case <-winch: + size := GetSize(fd) + if size == nil { + return + } + + // try to send size + select { + case resizeEvents <- *size: + // success + default: + // not sent + } + case <-stop: + return + } + } + }() +} diff --git a/pkg/util/term/resizeevents_windows.go b/pkg/util/term/resizeevents_windows.go new file mode 100644 index 0000000000..aa6de728b6 --- /dev/null +++ b/pkg/util/term/resizeevents_windows.go @@ -0,0 +1,57 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package term + +import ( + "time" + + "k8s.io/kubernetes/pkg/util/runtime" +) + +// monitorResizeEvents spawns a goroutine that periodically gets the terminal size and tries to send +// it to the resizeEvents channel if the size has changed. The goroutine stops when the stop channel +// is closed. +func monitorResizeEvents(fd uintptr, resizeEvents chan<- Size, stop chan struct{}) { + go func() { + defer runtime.HandleCrash() + + var lastSize Size + + for { + // see if we need to stop running + select { + case <-stop: + return + default: + } + + size := GetSize(fd) + if size == nil { + return + } + + if size.Height != lastSize.Height || size.Width != lastSize.Width { + lastSize.Height = size.Height + lastSize.Width = size.Width + resizeEvents <- *size + } + + // sleep to avoid hot looping + time.Sleep(250 * time.Millisecond) + } + }() +} diff --git a/pkg/util/term/term.go b/pkg/util/term/term.go index 68f99774df..14c7c3340a 100644 --- a/pkg/util/term/term.go +++ b/pkg/util/term/term.go @@ -21,17 +21,22 @@ import ( "os" "github.com/docker/docker/pkg/term" + "k8s.io/kubernetes/pkg/util/interrupt" ) // SafeFunc is a function to be invoked by TTY. type SafeFunc func() error -// TTY helps invoke a function and preserve the state of the terminal, even if the -// process is terminated during execution. +// TTY helps invoke a function and preserve the state of the terminal, even if the process is +// terminated during execution. It also provides support for terminal resizing for remote command +// execution/attachment. type TTY struct { - // In is a reader to check for a terminal. + // In is a reader representing stdin. It is a required field. In io.Reader + // Out is a writer representing stdout. It must be set to support terminal resizing. It is an + // optional field. + Out io.Writer // Raw is true if the terminal should be set raw. Raw bool // TryDev indicates the TTY should try to open /dev/tty if the provided input @@ -41,6 +46,10 @@ type TTY struct { // it will be invoked after the terminal state is restored. If it is not provided, // a signal received during the TTY will result in os.Exit(0) being invoked. Parent *interrupt.Handler + + // sizeQueue is set after a call to MonitorSize() and is used to monitor SIGWINCH signals when the + // user's terminal resizes. + sizeQueue *sizeQueue } // fd returns a file descriptor for a given object. @@ -48,10 +57,22 @@ type fd interface { Fd() uintptr } -// IsTerminal returns true if the provided input is a terminal. Does not check /dev/tty +// IsTerminalIn returns true if t.In is a terminal. Does not check /dev/tty // even if TryDev is set. -func (t TTY) IsTerminal() bool { - return IsTerminal(t.In) +func (t TTY) IsTerminalIn() bool { + return isTerminal(t.In) +} + +// IsTerminalOut returns true if t.Out is a terminal. Does not check /dev/tty +// even if TryDev is set. +func (t TTY) IsTerminalOut() bool { + return isTerminal(t.Out) +} + +// isTerminal returns whether the passed object is a terminal or not +func isTerminal(i interface{}) bool { + file, ok := i.(fd) + return ok && term.IsTerminal(file.Fd()) } // Safe invokes the provided function and will attempt to ensure that when the @@ -90,11 +111,11 @@ func (t TTY) Safe(fn SafeFunc) error { if err != nil { return err } - return interrupt.Chain(t.Parent, func() { term.RestoreTerminal(inFd, state) }).Run(fn) -} + return interrupt.Chain(t.Parent, func() { + if t.sizeQueue != nil { + t.sizeQueue.stop() + } -// IsTerminal returns whether the passed io.Reader is a terminal or not -func IsTerminal(r io.Reader) bool { - file, ok := r.(fd) - return ok && term.IsTerminal(file.Fd()) + term.RestoreTerminal(inFd, state) + }).Run(fn) } diff --git a/test/e2e_node/exec_util.go b/test/e2e_node/exec_util.go index 31afc64a68..1986091cda 100644 --- a/test/e2e_node/exec_util.go +++ b/test/e2e_node/exec_util.go @@ -34,7 +34,13 @@ func execute(method string, url *url.URL, config *restclient.Config, stdin io.Re if err != nil { return err } - return exec.Stream(remotecommandserver.SupportedStreamingProtocols, stdin, stdout, stderr, tty) + return exec.Stream(remotecommand.StreamOptions{ + SupportedProtocols: remotecommandserver.SupportedStreamingProtocols, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, + }) } func execCommandInContainer(config *restclient.Config, c *client.Client, ns, podName, containerName string, cmd []string) (string, error) {