diff --git a/cmd/hyperkube/kubectl.go b/cmd/hyperkube/kubectl.go index 068b2781e0..7e76bd3fde 100644 --- a/cmd/hyperkube/kubectl.go +++ b/cmd/hyperkube/kubectl.go @@ -17,14 +17,15 @@ limitations under the License. package main import ( - "os" - + "github.com/docker/docker/pkg/term" "k8s.io/kubernetes/pkg/kubectl/cmd" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" ) func NewKubectlServer() *Server { - cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), os.Stdin, os.Stdout, os.Stderr) + // need to use term.StdStreams to get the right IO refs on Windows + stdin, stdout, stderr := term.StdStreams() + cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), stdin, stdout, stderr) localFlags := cmd.LocalFlags() localFlags.SetInterspersed(false) diff --git a/cmd/kubectl/app/kubectl.go b/cmd/kubectl/app/kubectl.go index 27cbfd83b5..455ae55139 100644 --- a/cmd/kubectl/app/kubectl.go +++ b/cmd/kubectl/app/kubectl.go @@ -17,7 +17,7 @@ limitations under the License. package app import ( - "os" + "github.com/docker/docker/pkg/term" "k8s.io/kubernetes/pkg/kubectl/cmd" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" @@ -28,6 +28,8 @@ WARNING: this logic is duplicated, with minor changes, in cmd/hyperkube/kubectl. Any salient changes here will need to be manually reflected in that file. */ func Run() error { - cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), os.Stdin, os.Stdout, os.Stderr) + // need to use term.StdStreams to get the right IO refs on Windows + stdin, stdout, stderr := term.StdStreams() + cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), stdin, stdout, stderr) return cmd.Execute() } diff --git a/pkg/api/types.go b/pkg/api/types.go index 6eb9a2b15f..e6ceb57900 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -2751,6 +2751,8 @@ const ( StreamTypeData = "data" // Value for streamType header for error stream StreamTypeError = "error" + // Value for streamType header for terminal resize stream + StreamTypeResize = "resize" // Name of header that specifies the port being forwarded PortHeader = "port" diff --git a/pkg/client/unversioned/remotecommand/remotecommand.go b/pkg/client/unversioned/remotecommand/remotecommand.go index d39947d7e1..ff18c4e763 100644 --- a/pkg/client/unversioned/remotecommand/remotecommand.go +++ b/pkg/client/unversioned/remotecommand/remotecommand.go @@ -29,15 +29,28 @@ import ( "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" "k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream/spdy" + "k8s.io/kubernetes/pkg/util/term" ) +// StreamOptions holds information pertaining to the current streaming session: supported stream +// protocols, input/output streams, if the client is requesting a TTY, and a terminal size queue to +// support terminal resizing. +type StreamOptions struct { + SupportedProtocols []string + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer + Tty bool + TerminalSizeQueue term.TerminalSizeQueue +} + // Executor is an interface for transporting shell-style streams. type Executor interface { // Stream initiates the transport of the standard shell streams. It will transport any // non-nil stream to a remote system, and return an error if a problem occurs. If tty // is set, the stderr stream is not used (raw TTY manages stdout and stderr over the // stdout stream). - Stream(supportedProtocols []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error + Stream(options StreamOptions) error } // StreamExecutor supports the ability to dial an httpstream connection and the ability to @@ -129,14 +142,18 @@ func (e *streamExecutor) Dial(protocols ...string) (httpstream.Connection, strin return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil } +type streamCreator interface { + CreateStream(headers http.Header) (httpstream.Stream, error) +} + type streamProtocolHandler interface { - stream(httpstream.Connection) error + stream(conn streamCreator) error } // Stream opens a protocol streamer to the server and streams until a client closes // the connection or the server disconnects. -func (e *streamExecutor) Stream(supportedProtocols []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { - conn, protocol, err := e.Dial(supportedProtocols...) +func (e *streamExecutor) Stream(options StreamOptions) error { + conn, protocol, err := e.Dial(options.SupportedProtocols...) if err != nil { return err } @@ -145,23 +162,15 @@ func (e *streamExecutor) Stream(supportedProtocols []string, stdin io.Reader, st var streamer streamProtocolHandler switch protocol { + case remotecommand.StreamProtocolV3Name: + streamer = newStreamProtocolV3(options) case remotecommand.StreamProtocolV2Name: - streamer = &streamProtocolV2{ - stdin: stdin, - stdout: stdout, - stderr: stderr, - tty: tty, - } + streamer = newStreamProtocolV2(options) case "": glog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name) fallthrough case remotecommand.StreamProtocolV1Name: - streamer = &streamProtocolV1{ - stdin: stdin, - stdout: stdout, - stderr: stderr, - tty: tty, - } + streamer = newStreamProtocolV1(options) } return streamer.stream(conn) diff --git a/pkg/client/unversioned/remotecommand/remotecommand_test.go b/pkg/client/unversioned/remotecommand/remotecommand_test.go index 9c445c26f7..b228df2e7f 100644 --- a/pkg/client/unversioned/remotecommand/remotecommand_test.go +++ b/pkg/client/unversioned/remotecommand/remotecommand_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/httpstream" + "k8s.io/kubernetes/pkg/util/term" ) type fakeExecutor struct { @@ -52,11 +53,11 @@ type fakeExecutor struct { exec bool } -func (ex *fakeExecutor) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error { +func (ex *fakeExecutor) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error { return ex.run(name, uid, container, cmd, in, out, err, tty) } -func (ex *fakeExecutor) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error { +func (ex *fakeExecutor) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error { return ex.run(name, uid, container, nil, in, out, err, tty) } @@ -253,7 +254,13 @@ func TestStream(t *testing.T) { t.Errorf("%s: unexpected error: %v", name, err) continue } - err = e.Stream(testCase.ClientProtocols, streamIn, streamOut, streamErr, testCase.Tty) + err = e.Stream(StreamOptions{ + SupportedProtocols: testCase.ClientProtocols, + Stdin: streamIn, + Stdout: streamOut, + Stderr: streamErr, + Tty: testCase.Tty, + }) hasErr := err != nil if len(testCase.Error) > 0 { @@ -277,13 +284,13 @@ func TestStream(t *testing.T) { if len(testCase.Stdout) > 0 { if e, a := strings.Repeat(testCase.Stdout, testCase.MessageCount), localOut; e != a.String() { - t.Errorf("%s: expected stdout data '%s', got '%s'", name, e, a) + t.Errorf("%s: expected stdout data %q, got %q", name, e, a) } } if testCase.Stderr != "" { if e, a := strings.Repeat(testCase.Stderr, testCase.MessageCount), localErr; e != a.String() { - t.Errorf("%s: expected stderr data '%s', got '%s'", name, e, a) + t.Errorf("%s: expected stderr data %q, got %q", name, e, a) } } diff --git a/pkg/client/unversioned/remotecommand/v1.go b/pkg/client/unversioned/remotecommand/v1.go index 2fa5b2e911..11d5d7b595 100644 --- a/pkg/client/unversioned/remotecommand/v1.go +++ b/pkg/client/unversioned/remotecommand/v1.go @@ -28,19 +28,27 @@ import ( ) // streamProtocolV1 implements the first version of the streaming exec & attach -// protocol. This version has some bugs, such as not being able to detecte when +// protocol. This version has some bugs, such as not being able to detect when // non-interactive stdin data has ended. See http://issues.k8s.io/13394 and // http://issues.k8s.io/13395 for more details. type streamProtocolV1 struct { - stdin io.Reader - stdout io.Writer - stderr io.Writer - tty bool + StreamOptions + + errorStream httpstream.Stream + remoteStdin httpstream.Stream + remoteStdout httpstream.Stream + remoteStderr httpstream.Stream } var _ streamProtocolHandler = &streamProtocolV1{} -func (e *streamProtocolV1) stream(conn httpstream.Connection) error { +func newStreamProtocolV1(options StreamOptions) streamProtocolHandler { + return &streamProtocolV1{ + StreamOptions: options, + } +} + +func (p *streamProtocolV1) stream(conn streamCreator) error { doneChan := make(chan struct{}, 2) errorChan := make(chan error) @@ -55,19 +63,15 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error { } } - var ( - err error - errorStream, remoteStdin, remoteStdout, remoteStderr httpstream.Stream - ) - // set up all the streams first + var err error headers := http.Header{} headers.Set(api.StreamType, api.StreamTypeError) - errorStream, err = conn.CreateStream(headers) + p.errorStream, err = conn.CreateStream(headers) if err != nil { return err } - defer errorStream.Reset() + defer p.errorStream.Reset() // Create all the streams first, then start the copy goroutines. The server doesn't start its copy // goroutines until it's received all of the streams. If the client creates the stdin stream and @@ -76,38 +80,38 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error { // getting processed because the server hasn't started its copying, and it won't do that until it // gets all the streams. By creating all the streams first, we ensure that the server is ready to // process data before the client starts sending any. See https://issues.k8s.io/16373 for more info. - if e.stdin != nil { + if p.Stdin != nil { headers.Set(api.StreamType, api.StreamTypeStdin) - remoteStdin, err = conn.CreateStream(headers) + p.remoteStdin, err = conn.CreateStream(headers) if err != nil { return err } - defer remoteStdin.Reset() + defer p.remoteStdin.Reset() } - if e.stdout != nil { + if p.Stdout != nil { headers.Set(api.StreamType, api.StreamTypeStdout) - remoteStdout, err = conn.CreateStream(headers) + p.remoteStdout, err = conn.CreateStream(headers) if err != nil { return err } - defer remoteStdout.Reset() + defer p.remoteStdout.Reset() } - if e.stderr != nil && !e.tty { + if p.Stderr != nil && !p.Tty { headers.Set(api.StreamType, api.StreamTypeStderr) - remoteStderr, err = conn.CreateStream(headers) + p.remoteStderr, err = conn.CreateStream(headers) if err != nil { return err } - defer remoteStderr.Reset() + defer p.remoteStderr.Reset() } // now that all the streams have been created, proceed with reading & copying // always read from errorStream go func() { - message, err := ioutil.ReadAll(errorStream) + message, err := ioutil.ReadAll(p.errorStream) if err != nil && err != io.EOF { errorChan <- fmt.Errorf("Error reading from error stream: %s", err) return @@ -118,25 +122,25 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error { } }() - if e.stdin != nil { + if p.Stdin != nil { // TODO this goroutine will never exit cleanly (the io.Copy never unblocks) // because stdin is not closed until the process exits. If we try to call // stdin.Close(), it returns no error but doesn't unblock the copy. It will // exit when the process exits, instead. - go cp(api.StreamTypeStdin, remoteStdin, e.stdin) + go cp(api.StreamTypeStdin, p.remoteStdin, p.Stdin) } waitCount := 0 completedStreams := 0 - if e.stdout != nil { + if p.Stdout != nil { waitCount++ - go cp(api.StreamTypeStdout, e.stdout, remoteStdout) + go cp(api.StreamTypeStdout, p.Stdout, p.remoteStdout) } - if e.stderr != nil && !e.tty { + if p.Stderr != nil && !p.Tty { waitCount++ - go cp(api.StreamTypeStderr, e.stderr, remoteStderr) + go cp(api.StreamTypeStderr, p.Stderr, p.remoteStderr) } Loop: diff --git a/pkg/client/unversioned/remotecommand/v2.go b/pkg/client/unversioned/remotecommand/v2.go index 66ca9c6486..bcba33bdf7 100644 --- a/pkg/client/unversioned/remotecommand/v2.go +++ b/pkg/client/unversioned/remotecommand/v2.go @@ -24,7 +24,6 @@ import ( "sync" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/runtime" ) @@ -33,63 +32,69 @@ import ( // version is referred to as version 2, even though it is the first actual // numbered version. type streamProtocolV2 struct { - stdin io.Reader - stdout io.Writer - stderr io.Writer - tty bool + StreamOptions + + errorStream io.Reader + remoteStdin io.ReadWriteCloser + remoteStdout io.Reader + remoteStderr io.Reader } var _ streamProtocolHandler = &streamProtocolV2{} -func (e *streamProtocolV2) stream(conn httpstream.Connection) error { - var ( - err error - errorStream, remoteStdin, remoteStdout, remoteStderr httpstream.Stream - ) +func newStreamProtocolV2(options StreamOptions) streamProtocolHandler { + return &streamProtocolV2{ + StreamOptions: options, + } +} +func (p *streamProtocolV2) createStreams(conn streamCreator) error { + var err error headers := http.Header{} - // set up all the streams first // set up error stream - errorChan := make(chan error) headers.Set(api.StreamType, api.StreamTypeError) - errorStream, err = conn.CreateStream(headers) + p.errorStream, err = conn.CreateStream(headers) if err != nil { return err } // set up stdin stream - if e.stdin != nil { + if p.Stdin != nil { headers.Set(api.StreamType, api.StreamTypeStdin) - remoteStdin, err = conn.CreateStream(headers) + p.remoteStdin, err = conn.CreateStream(headers) if err != nil { return err } } // set up stdout stream - if e.stdout != nil { + if p.Stdout != nil { headers.Set(api.StreamType, api.StreamTypeStdout) - remoteStdout, err = conn.CreateStream(headers) + p.remoteStdout, err = conn.CreateStream(headers) if err != nil { return err } } // set up stderr stream - if e.stderr != nil && !e.tty { + if p.Stderr != nil && !p.Tty { headers.Set(api.StreamType, api.StreamTypeStderr) - remoteStderr, err = conn.CreateStream(headers) + p.remoteStderr, err = conn.CreateStream(headers) if err != nil { return err } } + return nil +} - // now that all the streams have been created, proceed with reading & copying +func (p *streamProtocolV2) setupErrorStreamReading() chan error { + errorChan := make(chan error) - // always read from errorStream go func() { - message, err := ioutil.ReadAll(errorStream) + defer runtime.HandleCrash() + + message, err := ioutil.ReadAll(p.errorStream) switch { case err != nil && err != io.EOF: errorChan <- fmt.Errorf("error reading from error stream: %s", err) @@ -101,18 +106,23 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error { close(errorChan) }() - var wg sync.WaitGroup - var once sync.Once + return errorChan +} + +func (p *streamProtocolV2) copyStdin() { + if p.Stdin != nil { + var once sync.Once - if e.stdin != nil { // copy from client's stdin to container's stdin go func() { - // if e.stdin is noninteractive, e.g. `echo abc | kubectl exec -i -- cat`, make sure - // we close remoteStdin as soon as the copy from e.stdin to remoteStdin finishes. Otherwise - // the executed command will remain running. - defer once.Do(func() { remoteStdin.Close() }) + defer runtime.HandleCrash() - if _, err := io.Copy(remoteStdin, e.stdin); err != nil { + // if p.stdin is noninteractive, p.g. `echo abc | kubectl exec -i -- cat`, make sure + // we close remoteStdin as soon as the copy from p.stdin to remoteStdin finishes. Otherwise + // the executed command will remain running. + defer once.Do(func() { p.remoteStdin.Close() }) + + if _, err := io.Copy(p.remoteStdin, p.Stdin); err != nil { runtime.HandleError(err) } }() @@ -121,6 +131,9 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error { // be able to exit interactive sessions cleanly and not leak goroutines or // hang the client's terminal. // + // TODO we aren't using go-dockerclient any more; revisit this to determine if it's still + // required by engine-api. + // // go-dockerclient's current hijack implementation // (https://github.com/fsouza/go-dockerclient/blob/89f3d56d93788dfe85f864a44f85d9738fca0670/client.go#L564) // waits for all three streams (stdin/stdout/stderr) to finish copying @@ -129,35 +142,65 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error { // When that happens, we must Close() on our side of remoteStdin, to // allow the copy in hijack to complete, and hijack to return. go func() { - defer once.Do(func() { remoteStdin.Close() }) + defer runtime.HandleCrash() + defer once.Do(func() { p.remoteStdin.Close() }) + // this "copy" doesn't actually read anything - it's just here to wait for // the server to close remoteStdin. - if _, err := io.Copy(ioutil.Discard, remoteStdin); err != nil { + if _, err := io.Copy(ioutil.Discard, p.remoteStdin); err != nil { runtime.HandleError(err) } }() } +} + +func (p *streamProtocolV2) copyStdout(wg *sync.WaitGroup) { + if p.Stdout == nil { + return + } - if e.stdout != nil { - wg.Add(1) - go func() { - defer wg.Done() - if _, err := io.Copy(e.stdout, remoteStdout); err != nil { - runtime.HandleError(err) - } - }() + wg.Add(1) + go func() { + defer runtime.HandleCrash() + defer wg.Done() + + if _, err := io.Copy(p.Stdout, p.remoteStdout); err != nil { + runtime.HandleError(err) + } + }() +} + +func (p *streamProtocolV2) copyStderr(wg *sync.WaitGroup) { + if p.Stderr == nil || p.Tty { + return } - if e.stderr != nil && !e.tty { - wg.Add(1) - go func() { - defer wg.Done() - if _, err := io.Copy(e.stderr, remoteStderr); err != nil { - runtime.HandleError(err) - } - }() + wg.Add(1) + go func() { + defer runtime.HandleCrash() + defer wg.Done() + + if _, err := io.Copy(p.Stderr, p.remoteStderr); err != nil { + runtime.HandleError(err) + } + }() +} + +func (p *streamProtocolV2) stream(conn streamCreator) error { + if err := p.createStreams(conn); err != nil { + return err } + // now that all the streams have been created, proceed with reading & copying + + errorChan := p.setupErrorStreamReading() + + p.copyStdin() + + var wg sync.WaitGroup + p.copyStdout(&wg) + p.copyStderr(&wg) + // we're waiting for stdout/stderr to finish copying wg.Wait() diff --git a/pkg/client/unversioned/remotecommand/v2_test.go b/pkg/client/unversioned/remotecommand/v2_test.go new file mode 100644 index 0000000000..d5e988b1ec --- /dev/null +++ b/pkg/client/unversioned/remotecommand/v2_test.go @@ -0,0 +1,228 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remotecommand + +import ( + "errors" + "io" + "net/http" + "strings" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/httpstream" + "k8s.io/kubernetes/pkg/util/wait" +) + +type fakeReader struct { + err error +} + +func (r *fakeReader) Read([]byte) (int, error) { return 0, r.err } + +type fakeWriter struct{} + +func (*fakeWriter) Write([]byte) (int, error) { return 0, nil } + +type fakeStreamCreator struct { + created map[string]bool + errors map[string]error +} + +var _ streamCreator = &fakeStreamCreator{} + +func (f *fakeStreamCreator) CreateStream(headers http.Header) (httpstream.Stream, error) { + streamType := headers.Get(api.StreamType) + f.created[streamType] = true + return nil, f.errors[streamType] +} + +func TestV2CreateStreams(t *testing.T) { + tests := []struct { + name string + stdin bool + stdinError error + stdout bool + stdoutError error + stderr bool + stderrError error + errorError error + tty bool + expectError bool + }{ + { + name: "stdin error", + stdin: true, + stdinError: errors.New("stdin error"), + expectError: true, + }, + { + name: "stdout error", + stdout: true, + stdoutError: errors.New("stdout error"), + expectError: true, + }, + { + name: "stderr error", + stderr: true, + stderrError: errors.New("stderr error"), + expectError: true, + }, + { + name: "error stream error", + stdin: true, + stdout: true, + stderr: true, + errorError: errors.New("error stream error"), + expectError: true, + }, + { + name: "no errors", + stdin: true, + stdout: true, + stderr: true, + expectError: false, + }, + { + name: "no errors, stderr & tty set, don't expect stderr", + stdin: true, + stdout: true, + stderr: true, + tty: true, + expectError: false, + }, + } + for _, test := range tests { + conn := &fakeStreamCreator{ + created: make(map[string]bool), + errors: map[string]error{ + api.StreamTypeStdin: test.stdinError, + api.StreamTypeStdout: test.stdoutError, + api.StreamTypeStderr: test.stderrError, + api.StreamTypeError: test.errorError, + }, + } + + opts := StreamOptions{Tty: test.tty} + if test.stdin { + opts.Stdin = &fakeReader{} + } + if test.stdout { + opts.Stdout = &fakeWriter{} + } + if test.stderr { + opts.Stderr = &fakeWriter{} + } + + h := newStreamProtocolV2(opts).(*streamProtocolV2) + err := h.createStreams(conn) + + if test.expectError { + if err == nil { + t.Errorf("%s: expected error", test.name) + continue + } + if e, a := test.stdinError, err; test.stdinError != nil && e != a { + t.Errorf("%s: expected %v, got %v", test.name, e, a) + } + if e, a := test.stdoutError, err; test.stdoutError != nil && e != a { + t.Errorf("%s: expected %v, got %v", test.name, e, a) + } + if e, a := test.stderrError, err; test.stderrError != nil && e != a { + t.Errorf("%s: expected %v, got %v", test.name, e, a) + } + if e, a := test.errorError, err; test.errorError != nil && e != a { + t.Errorf("%s: expected %v, got %v", test.name, e, a) + } + continue + } + + if !test.expectError && err != nil { + t.Errorf("%s: unexpected error: %v", test.name, err) + continue + } + + if test.stdin && !conn.created[api.StreamTypeStdin] { + t.Errorf("%s: expected stdin stream", test.name) + } + if test.stdout && !conn.created[api.StreamTypeStdout] { + t.Errorf("%s: expected stdout stream", test.name) + } + if test.stderr { + if test.tty && conn.created[api.StreamTypeStderr] { + t.Errorf("%s: unexpected stderr stream because tty is set", test.name) + } else if !test.tty && !conn.created[api.StreamTypeStderr] { + t.Errorf("%s: expected stderr stream", test.name) + } + } + if !conn.created[api.StreamTypeError] { + t.Errorf("%s: expected error stream", test.name) + } + + } +} + +func TestV2ErrorStreamReading(t *testing.T) { + tests := []struct { + name string + stream io.Reader + expectedError error + }{ + { + name: "error reading from stream", + stream: &fakeReader{errors.New("foo")}, + expectedError: errors.New("error reading from error stream: foo"), + }, + { + name: "stream returns an error", + stream: strings.NewReader("some error"), + expectedError: errors.New("error executing remote command: some error"), + }, + } + + for _, test := range tests { + h := newStreamProtocolV2(StreamOptions{}).(*streamProtocolV2) + h.errorStream = test.stream + + ch := h.setupErrorStreamReading() + if ch == nil { + t.Fatalf("%s: unexpected nil channel", test.name) + } + + var err error + select { + case err = <-ch: + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("%s: timed out", test.name) + } + + if test.expectedError != nil { + if err == nil { + t.Errorf("%s: expected an error", test.name) + } else if e, a := test.expectedError, err; e.Error() != a.Error() { + t.Errorf("%s: expected %q, got %q", test.name, e, a) + } + continue + } + + if test.expectedError == nil && err != nil { + t.Errorf("%s: unexpected error: %v", test.name, err) + continue + } + } +} diff --git a/pkg/client/unversioned/remotecommand/v3.go b/pkg/client/unversioned/remotecommand/v3.go new file mode 100644 index 0000000000..7b46f940ef --- /dev/null +++ b/pkg/client/unversioned/remotecommand/v3.go @@ -0,0 +1,108 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remotecommand + +import ( + "encoding/json" + "io" + "net/http" + "sync" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/runtime" +) + +// streamProtocolV3 implements version 3 of the streaming protocol for attach +// and exec. This version adds support for resizing the container's terminal. +type streamProtocolV3 struct { + *streamProtocolV2 + + resizeStream io.Writer +} + +var _ streamProtocolHandler = &streamProtocolV3{} + +func newStreamProtocolV3(options StreamOptions) streamProtocolHandler { + return &streamProtocolV3{ + streamProtocolV2: newStreamProtocolV2(options).(*streamProtocolV2), + } +} + +func (p *streamProtocolV3) createStreams(conn streamCreator) error { + // set up the streams from v2 + if err := p.streamProtocolV2.createStreams(conn); err != nil { + return err + } + + // set up resize stream + if p.Tty { + headers := http.Header{} + headers.Set(api.StreamType, api.StreamTypeResize) + var err error + p.resizeStream, err = conn.CreateStream(headers) + if err != nil { + return err + } + } + + return nil +} + +func (p *streamProtocolV3) handleResizes() { + if p.resizeStream == nil { + return + } + + go func() { + defer runtime.HandleCrash() + + encoder := json.NewEncoder(p.resizeStream) + for { + size := p.TerminalSizeQueue.Next() + if size == nil { + return + } + if err := encoder.Encode(&size); err != nil { + runtime.HandleError(err) + } + } + }() +} + +func (p *streamProtocolV3) stream(conn streamCreator) error { + if err := p.createStreams(conn); err != nil { + return err + } + + // now that all the streams have been created, proceed with reading & copying + + errorChan := p.setupErrorStreamReading() + + p.handleResizes() + + p.copyStdin() + + var wg sync.WaitGroup + p.copyStdout(&wg) + p.copyStderr(&wg) + + // we're waiting for stdout/stderr to finish copying + wg.Wait() + + // waits for errorStream to finish reading with an error or nil + return <-errorChan +} diff --git a/pkg/kubectl/cmd/attach.go b/pkg/kubectl/cmd/attach.go index 4fb6b1006d..2a2c76ed18 100644 --- a/pkg/kubectl/cmd/attach.go +++ b/pkg/kubectl/cmd/attach.go @@ -77,18 +77,25 @@ func NewCmdAttach(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer) // RemoteAttach defines the interface accepted by the Attach command - provided for test stubbing type RemoteAttach interface { - Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error + Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error } // DefaultRemoteAttach is the standard implementation of attaching type DefaultRemoteAttach struct{} -func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { +func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error { exec, err := remotecommand.NewExecutor(config, method, url) if err != nil { return err } - return exec.Stream(remotecommandserver.SupportedStreamingProtocols, stdin, stdout, stderr, tty) + return exec.Stream(remotecommand.StreamOptions{ + SupportedProtocols: remotecommandserver.SupportedStreamingProtocols, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, + TerminalSizeQueue: terminalSizeQueue, + }) } // AttachOptions declare the arguments accepted by the Exec command @@ -182,7 +189,10 @@ func (p *AttachOptions) Run() error { pod := p.Pod // ensure we can recover the terminal while attached - t := term.TTY{Parent: p.InterruptParent} + t := term.TTY{ + Parent: p.InterruptParent, + Out: p.Out, + } // check for TTY tty := p.TTY @@ -196,17 +206,41 @@ func (p *AttachOptions) Run() error { } if p.Stdin { t.In = p.In - if tty && !t.IsTerminal() { + if tty && !t.IsTerminalIn() { tty = false fmt.Fprintln(p.Err, "Unable to use a TTY - input is not a terminal or the right kind of file") } + } else { + p.In = nil } t.Raw = tty - fn := func() error { - if tty { - fmt.Fprintln(p.Out, "\nHit enter for command prompt") + // save p.Err so we can print the command prompt message below + stderr := p.Err + + var sizeQueue term.TerminalSizeQueue + if tty { + if size := t.GetSize(); size != nil { + // fake resizing +1 and then back to normal so that attach-detach-reattach will result in the + // screen being redrawn + sizePlusOne := *size + sizePlusOne.Width++ + sizePlusOne.Height++ + + // this call spawns a goroutine to monitor/update the terminal size + sizeQueue = t.MonitorSize(&sizePlusOne, size) } + + // unset p.Err if it was previously set because both stdout and stderr go over p.Out when tty is + // true + p.Err = nil + } + + fn := func() error { + if stderr != nil { + fmt.Fprintln(stderr, "If you don't see a command prompt, try pressing enter.") + } + // TODO: consider abstracting into a client invocation or client helper req := p.Client.RESTClient.Post(). Resource("pods"). @@ -215,13 +249,13 @@ func (p *AttachOptions) Run() error { SubResource("attach") req.VersionedParams(&api.PodAttachOptions{ Container: containerToAttach.Name, - Stdin: p.In != nil, + Stdin: p.Stdin, Stdout: p.Out != nil, Stderr: p.Err != nil, TTY: tty, }, api.ParameterCodec) - return p.Attach.Attach("POST", req.URL(), p.Config, p.In, p.Out, p.Err, tty) + return p.Attach.Attach("POST", req.URL(), p.Config, p.In, p.Out, p.Err, tty, sizeQueue) } if err := t.Safe(fn); err != nil { diff --git a/pkg/kubectl/cmd/attach_test.go b/pkg/kubectl/cmd/attach_test.go index 9ebc756b3c..439ccf2c11 100644 --- a/pkg/kubectl/cmd/attach_test.go +++ b/pkg/kubectl/cmd/attach_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/unversioned/fake" + "k8s.io/kubernetes/pkg/util/term" ) type fakeRemoteAttach struct { @@ -40,7 +41,7 @@ type fakeRemoteAttach struct { err error } -func (f *fakeRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { +func (f *fakeRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error { f.method = method f.url = url return f.err diff --git a/pkg/kubectl/cmd/exec.go b/pkg/kubectl/cmd/exec.go index 83f9cea789..697b59595e 100644 --- a/pkg/kubectl/cmd/exec.go +++ b/pkg/kubectl/cmd/exec.go @@ -20,11 +20,7 @@ import ( "fmt" "io" "net/url" - "os" - "os/signal" - "syscall" - "github.com/docker/docker/pkg/term" "github.com/golang/glog" "github.com/renstrom/dedent" "github.com/spf13/cobra" @@ -34,6 +30,8 @@ import ( "k8s.io/kubernetes/pkg/client/unversioned/remotecommand" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" + "k8s.io/kubernetes/pkg/util/interrupt" + "k8s.io/kubernetes/pkg/util/term" ) var ( @@ -79,18 +77,25 @@ func NewCmdExec(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer) * // RemoteExecutor defines the interface accepted by the Exec command - provided for test stubbing type RemoteExecutor interface { - Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error + Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error } // DefaultRemoteExecutor is the standard implementation of remote command execution type DefaultRemoteExecutor struct{} -func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { +func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error { exec, err := remotecommand.NewExecutor(config, method, url) if err != nil { return err } - return exec.Stream(remotecommandserver.SupportedStreamingProtocols, stdin, stdout, stderr, tty) + return exec.Stream(remotecommand.StreamOptions{ + SupportedProtocols: remotecommandserver.SupportedStreamingProtocols, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, + TerminalSizeQueue: terminalSizeQueue, + }) } // ExecOptions declare the arguments accepted by the Exec command @@ -102,6 +107,9 @@ type ExecOptions struct { TTY bool Command []string + // InterruptParent, if set, is used to handle interrupts while attached + InterruptParent *interrupt.Handler + In io.Reader Out io.Writer Err io.Writer @@ -186,58 +194,58 @@ func (p *ExecOptions) Run() error { containerName = pod.Spec.Containers[0].Name } - // TODO: refactor with terminal helpers from the edit utility once that is merged - var stdin io.Reader - tty := p.TTY - if p.Stdin { - stdin = p.In - if tty { - if file, ok := stdin.(*os.File); ok { - inFd := file.Fd() - if term.IsTerminal(inFd) { - oldState, err := term.SetRawTerminal(inFd) - if err != nil { - glog.Fatal(err) - } - // this handles a clean exit, where the command finished - defer term.RestoreTerminal(inFd, oldState) - - // SIGINT is handled by term.SetRawTerminal (it runs a goroutine that listens - // for SIGINT and restores the terminal before exiting) - - // this handles SIGTERM - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGTERM) - go func() { - <-sigChan - term.RestoreTerminal(inFd, oldState) - os.Exit(0) - }() - } else { - fmt.Fprintln(p.Err, "STDIN is not a terminal") - } - } else { - tty = false - fmt.Fprintln(p.Err, "Unable to use a TTY - input is not the right kind of file") - } - } + // ensure we can recover the terminal while attached + t := term.TTY{ + Parent: p.InterruptParent, + Out: p.Out, } - // TODO: consider abstracting into a client invocation or client helper - req := p.Client.RESTClient.Post(). - Resource("pods"). - Name(pod.Name). - Namespace(pod.Namespace). - SubResource("exec"). - Param("container", containerName) - req.VersionedParams(&api.PodExecOptions{ - Container: containerName, - Command: p.Command, - Stdin: stdin != nil, - Stdout: p.Out != nil, - Stderr: p.Err != nil, - TTY: tty, - }, api.ParameterCodec) + // check for TTY + tty := p.TTY + if p.Stdin { + t.In = p.In + if tty && !t.IsTerminalIn() { + tty = false + fmt.Fprintln(p.Err, "Unable to use a TTY - input is not a terminal or the right kind of file") + } + } else { + p.In = nil + } + t.Raw = tty - return p.Executor.Execute("POST", req.URL(), p.Config, stdin, p.Out, p.Err, tty) + var sizeQueue term.TerminalSizeQueue + if tty { + // this call spawns a goroutine to monitor/update the terminal size + sizeQueue = t.MonitorSize(t.GetSize()) + + // unset p.Err if it was previously set because both stdout and stderr go over p.Out when tty is + // true + p.Err = nil + } + + fn := func() error { + // TODO: consider abstracting into a client invocation or client helper + req := p.Client.RESTClient.Post(). + Resource("pods"). + Name(pod.Name). + Namespace(pod.Namespace). + SubResource("exec"). + Param("container", containerName) + req.VersionedParams(&api.PodExecOptions{ + Container: containerName, + Command: p.Command, + Stdin: p.Stdin, + Stdout: p.Out != nil, + Stderr: p.Err != nil, + TTY: tty, + }, api.ParameterCodec) + + return p.Executor.Execute("POST", req.URL(), p.Config, p.In, p.Out, p.Err, tty, sizeQueue) + } + + if err := t.Safe(fn); err != nil { + return err + } + + return nil } diff --git a/pkg/kubectl/cmd/exec_test.go b/pkg/kubectl/cmd/exec_test.go index 8a4ed3479f..8a0dc536b3 100644 --- a/pkg/kubectl/cmd/exec_test.go +++ b/pkg/kubectl/cmd/exec_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/unversioned/fake" + "k8s.io/kubernetes/pkg/util/term" ) type fakeRemoteExecutor struct { @@ -40,7 +41,7 @@ type fakeRemoteExecutor struct { execErr error } -func (f *fakeRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { +func (f *fakeRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error { f.method = method f.url = url return f.execErr diff --git a/pkg/kubelet/container/resize.go b/pkg/kubelet/container/resize.go new file mode 100644 index 0000000000..1686031777 --- /dev/null +++ b/pkg/kubelet/container/resize.go @@ -0,0 +1,46 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/term" +) + +// handleResizing spawns a goroutine that processes the resize channel, calling resizeFunc for each +// term.Size received from the channel. The resize channel must be closed elsewhere to stop the +// goroutine. +func HandleResizing(resize <-chan term.Size, resizeFunc func(size term.Size)) { + if resize == nil { + return + } + + go func() { + defer runtime.HandleCrash() + + for { + size, ok := <-resize + if !ok { + return + } + if size.Height < 1 || size.Width < 1 { + continue + } + resizeFunc(size) + } + }() +} diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index df06a737b7..1f0bd1f57e 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/flowcontrol" + "k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/volume" ) @@ -126,7 +127,7 @@ type Runtime interface { } type ContainerAttacher interface { - AttachContainer(id ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) (err error) + AttachContainer(id ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) } // CommandRunner encapsulates the command runner interfaces for testability. @@ -134,7 +135,7 @@ type ContainerCommandRunner interface { // Runs the command in the container of the specified pod using nsenter. // Attaches the processes stdin, stdout, and stderr. Optionally uses a // tty. - ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error + ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error // Forward the specified port from the specified pod to the stream. PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error } diff --git a/pkg/kubelet/container/testing/fake_runtime.go b/pkg/kubelet/container/testing/fake_runtime.go index 3612f555bf..36cc00b142 100644 --- a/pkg/kubelet/container/testing/fake_runtime.go +++ b/pkg/kubelet/container/testing/fake_runtime.go @@ -27,6 +27,7 @@ import ( . "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/flowcontrol" + "k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/volume" ) @@ -273,7 +274,7 @@ func (f *FakeRuntime) GetPodStatus(uid types.UID, name, namespace string) (*PodS return &status, f.Err } -func (f *FakeRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (f *FakeRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { f.Lock() defer f.Unlock() @@ -281,7 +282,7 @@ func (f *FakeRuntime) ExecInContainer(containerID ContainerID, cmd []string, std return f.Err } -func (f *FakeRuntime) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (f *FakeRuntime) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { f.Lock() defer f.Unlock() diff --git a/pkg/kubelet/container/testing/runtime_mock.go b/pkg/kubelet/container/testing/runtime_mock.go index 43dd2a9573..a50db35984 100644 --- a/pkg/kubelet/container/testing/runtime_mock.go +++ b/pkg/kubelet/container/testing/runtime_mock.go @@ -24,6 +24,7 @@ import ( . "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/flowcontrol" + "k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/volume" ) @@ -88,12 +89,12 @@ func (r *Mock) GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, return args.Get(0).(*PodStatus), args.Error(1) } -func (r *Mock) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (r *Mock) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { args := r.Called(containerID, cmd, stdin, stdout, stderr, tty) return args.Error(0) } -func (r *Mock) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (r *Mock) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { args := r.Called(containerID, stdin, stdout, stderr, tty) return args.Error(0) } diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index 0eb4631704..5ec2f6be4a 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -77,6 +77,8 @@ type DockerInterface interface { StartExec(string, dockertypes.ExecStartCheck, StreamOptions) error InspectExec(id string) (*dockertypes.ContainerExecInspect, error) AttachToContainer(string, dockertypes.ContainerAttachOptions, StreamOptions) error + ResizeContainerTTY(id string, height, width int) error + ResizeExecTTY(id string, height, width int) error } // KubeletContainerName encapsulates a pod name and a Kubernetes container name. diff --git a/pkg/kubelet/dockertools/docker_manager.go b/pkg/kubelet/dockertools/docker_manager.go index 65287de058..dc4c582f39 100644 --- a/pkg/kubelet/dockertools/docker_manager.go +++ b/pkg/kubelet/dockertools/docker_manager.go @@ -63,6 +63,7 @@ import ( utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" utilstrings "k8s.io/kubernetes/pkg/util/strings" + "k8s.io/kubernetes/pkg/util/term" ) const ( @@ -1098,7 +1099,7 @@ func (d *dockerExitError) ExitStatus() int { } // ExecInContainer runs the command inside the container identified by containerID. -func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { if dm.execHandler == nil { return errors.New("unable to exec without an exec handler") } @@ -1111,10 +1112,16 @@ func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID, return fmt.Errorf("container not running (%s)", container.ID) } - return dm.execHandler.ExecInContainer(dm.client, container, cmd, stdin, stdout, stderr, tty) + return dm.execHandler.ExecInContainer(dm.client, container, cmd, stdin, stdout, stderr, tty, resize) } -func (dm *DockerManager) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (dm *DockerManager) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { + // Have to start this before the call to client.AttachToContainer because client.AttachToContainer is a blocking + // call :-( Otherwise, resize events don't get processed and the terminal never resizes. + kubecontainer.HandleResizing(resize, func(size term.Size) { + dm.client.ResizeContainerTTY(containerID.ID, int(size.Height), int(size.Width)) + }) + // TODO(random-liu): Do we really use the *Logs* field here? opts := dockertypes.ContainerAttachOptions{ Stream: true, diff --git a/pkg/kubelet/dockertools/exec.go b/pkg/kubelet/dockertools/exec.go index b836652888..ce213650b0 100644 --- a/pkg/kubelet/dockertools/exec.go +++ b/pkg/kubelet/dockertools/exec.go @@ -26,18 +26,19 @@ import ( dockertypes "github.com/docker/engine-api/types" "github.com/golang/glog" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/util/term" ) // ExecHandler knows how to execute a command in a running Docker container. type ExecHandler interface { - ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error + ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error } // NsenterExecHandler executes commands in Docker containers using nsenter. type NsenterExecHandler struct{} // TODO should we support nsenter in a container, running with elevated privs and --pid=host? -func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { nsenter, err := exec.LookPath("nsenter") if err != nil { return fmt.Errorf("exec unavailable - unable to locate nsenter") @@ -61,6 +62,10 @@ func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *do // make sure to close the stdout stream defer stdout.Close() + kubecontainer.HandleResizing(resize, func(size term.Size) { + term.SetSize(p.Fd(), size) + }) + if stdin != nil { go io.Copy(p, stdin) } @@ -98,7 +103,7 @@ func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *do // NativeExecHandler executes commands in Docker containers using Docker's exec API. type NativeExecHandler struct{} -func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { createOpts := dockertypes.ExecConfig{ Cmd: cmd, AttachStdin: stdin != nil, @@ -110,6 +115,13 @@ func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *doc if err != nil { return fmt.Errorf("failed to exec in container - Exec setup failed - %v", err) } + + // Have to start this before the call to client.StartExec because client.StartExec is a blocking + // call :-( Otherwise, resize events don't get processed and the terminal never resizes. + kubecontainer.HandleResizing(resize, func(size term.Size) { + client.ResizeExecTTY(execObj.ID, int(size.Height), int(size.Width)) + }) + startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty} streamOpts := StreamOptions{ InputStream: stdin, @@ -121,6 +133,7 @@ func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *doc if err != nil { return err } + ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() count := 0 diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index a9d3c2249b..c6246665ac 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -500,6 +500,20 @@ func (f *FakeDockerClient) updateContainerStatus(id, status string) { } } +func (f *FakeDockerClient) ResizeExecTTY(id string, height, width int) error { + f.Lock() + defer f.Unlock() + f.called = append(f.called, "resize_exec") + return nil +} + +func (f *FakeDockerClient) ResizeContainerTTY(id string, height, width int) error { + f.Lock() + defer f.Unlock() + f.called = append(f.called, "resize_container") + return nil +} + // FakeDockerPuller is a stub implementation of DockerPuller. type FakeDockerPuller struct { sync.Mutex diff --git a/pkg/kubelet/dockertools/instrumented_docker.go b/pkg/kubelet/dockertools/instrumented_docker.go index 3a6e85493b..8503fe2d30 100644 --- a/pkg/kubelet/dockertools/instrumented_docker.go +++ b/pkg/kubelet/dockertools/instrumented_docker.go @@ -213,3 +213,21 @@ func (in instrumentedDockerInterface) ImageHistory(id string) ([]dockertypes.Ima recordError(operation, err) return out, err } + +func (in instrumentedDockerInterface) ResizeExecTTY(id string, height, width int) error { + const operation = "resize_exec" + defer recordOperation(operation, time.Now()) + + err := in.client.ResizeExecTTY(id, height, width) + recordError(operation, err) + return err +} + +func (in instrumentedDockerInterface) ResizeContainerTTY(id string, height, width int) error { + const operation = "resize_container" + defer recordOperation(operation, time.Now()) + + err := in.client.ResizeContainerTTY(id, height, width) + recordError(operation, err) + return err +} diff --git a/pkg/kubelet/dockertools/kube_docker_client.go b/pkg/kubelet/dockertools/kube_docker_client.go index d66fc380cb..9b31b4b6e5 100644 --- a/pkg/kubelet/dockertools/kube_docker_client.go +++ b/pkg/kubelet/dockertools/kube_docker_client.go @@ -454,6 +454,24 @@ func (d *kubeDockerClient) AttachToContainer(id string, opts dockertypes.Contain return d.holdHijackedConnection(sopts.RawTerminal, sopts.InputStream, sopts.OutputStream, sopts.ErrorStream, resp) } +func (d *kubeDockerClient) ResizeExecTTY(id string, height, width int) error { + ctx, cancel := d.getCancelableContext() + defer cancel() + return d.client.ContainerExecResize(ctx, id, dockertypes.ResizeOptions{ + Height: height, + Width: width, + }) +} + +func (d *kubeDockerClient) ResizeContainerTTY(id string, height, width int) error { + ctx, cancel := d.getCancelableContext() + defer cancel() + return d.client.ContainerResize(ctx, id, dockertypes.ResizeOptions{ + Height: height, + Width: width, + }) +} + // redirectResponseToOutputStream redirect the response stream to stdout and stderr. When tty is true, all stream will // only be redirected to stdout. func (d *kubeDockerClient) redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, resp io.Reader) error { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d6f79a5d9c..29a8598c29 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -88,6 +88,7 @@ import ( utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/selinux" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/term" utilvalidation "k8s.io/kubernetes/pkg/util/validation" "k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/util/wait" @@ -3777,7 +3778,7 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe var buffer bytes.Buffer output := ioutils.WriteCloserWrapper(&buffer) - err = kl.runner.ExecInContainer(container.ID, cmd, nil, output, output, false) + err = kl.runner.ExecInContainer(container.ID, cmd, nil, output, output, false, nil) if err != nil { return nil, err } @@ -3787,7 +3788,7 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe // ExecInContainer executes a command in a container, connecting the supplied // stdin/stdout/stderr to the command's IO streams. -func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { podUID = kl.podManager.TranslatePodUID(podUID) container, err := kl.findContainer(podFullName, podUID, containerName) @@ -3797,12 +3798,12 @@ func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, contain if container == nil { return fmt.Errorf("container not found (%q)", containerName) } - return kl.runner.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty) + return kl.runner.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty, resize) } // AttachContainer uses the container runtime to attach the given streams to // the given container. -func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { podUID = kl.podManager.TranslatePodUID(podUID) container, err := kl.findContainer(podFullName, podUID, containerName) @@ -3812,7 +3813,7 @@ func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, contain if container == nil { return fmt.Errorf("container not found (%q)", containerName) } - return kl.containerRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty) + return kl.containerRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty, resize) } // PortForward connects to the pod's port and copies data between the port diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 27642a6b36..8caee89dd4 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -74,6 +74,7 @@ import ( "k8s.io/kubernetes/pkg/util/rand" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume" @@ -1079,7 +1080,7 @@ type fakeContainerCommandRunner struct { Stream io.ReadWriteCloser } -func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error { +func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error { f.Cmd = cmd f.ID = id f.Stdin = in @@ -2096,6 +2097,7 @@ func TestExecInContainerNoSuchPod(t *testing.T) { nil, nil, false, + nil, ) if err == nil { t.Fatal("unexpected non-error") @@ -2140,6 +2142,7 @@ func TestExecInContainerNoSuchContainer(t *testing.T) { nil, nil, false, + nil, ) if err == nil { t.Fatal("unexpected non-error") @@ -2200,6 +2203,7 @@ func TestExecInContainer(t *testing.T) { stdout, stderr, tty, + nil, ) if err != nil { t.Fatalf("unexpected error: %s", err) diff --git a/pkg/kubelet/lifecycle/handlers.go b/pkg/kubelet/lifecycle/handlers.go index f3894659f0..6ba3effc6e 100644 --- a/pkg/kubelet/lifecycle/handlers.go +++ b/pkg/kubelet/lifecycle/handlers.go @@ -60,7 +60,7 @@ func (hr *HandlerRunner) Run(containerID kubecontainer.ContainerID, pod *api.Pod msg string ) output := ioutils.WriteCloserWrapper(&buffer) - err := hr.commandRunner.ExecInContainer(containerID, handler.Exec.Command, nil, output, output, false) + err := hr.commandRunner.ExecInContainer(containerID, handler.Exec.Command, nil, output, output, false, nil) if err != nil { msg := fmt.Sprintf("Exec lifecycle hook (%v) for Container %q in Pod %q failed - %q", handler.Exec.Command, container.Name, format.Pod(pod), buffer.String()) glog.V(1).Infof(msg) diff --git a/pkg/kubelet/lifecycle/handlers_test.go b/pkg/kubelet/lifecycle/handlers_test.go index 1bdd711421..74399a5369 100644 --- a/pkg/kubelet/lifecycle/handlers_test.go +++ b/pkg/kubelet/lifecycle/handlers_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/util/intstr" + "k8s.io/kubernetes/pkg/util/term" ) func TestResolvePortInt(t *testing.T) { @@ -80,7 +81,7 @@ type fakeContainerCommandRunner struct { ID kubecontainer.ContainerID } -func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error { +func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error { f.Cmd = cmd f.ID = id return nil diff --git a/pkg/kubelet/prober/prober.go b/pkg/kubelet/prober/prober.go index 19e02238c3..c307b2e0e5 100644 --- a/pkg/kubelet/prober/prober.go +++ b/pkg/kubelet/prober/prober.go @@ -228,7 +228,7 @@ func (p *prober) newExecInContainer(container api.Container, containerID kubecon return execInContainer{func() ([]byte, error) { var buffer bytes.Buffer output := ioutils.WriteCloserWrapper(&buffer) - err := p.runner.ExecInContainer(containerID, cmd, nil, output, output, false) + err := p.runner.ExecInContainer(containerID, cmd, nil, output, output, false, nil) if err != nil { return nil, err } diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index ff589c6a47..c6ac08b8f3 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -60,6 +60,7 @@ import ( "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/selinux" utilstrings "k8s.io/kubernetes/pkg/util/strings" + "k8s.io/kubernetes/pkg/util/term" utilwait "k8s.io/kubernetes/pkg/util/wait" ) @@ -2007,14 +2008,14 @@ func newRktExitError(e error) error { return e } -func (r *Runtime) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (r *Runtime) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { return fmt.Errorf("unimplemented") } // Note: In rkt, the container ID is in the form of "UUID:appName", where UUID is // the rkt UUID, and appName is the container name. // TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail. -func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { +func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { glog.V(4).Infof("Rkt execing in container.") id, err := parseContainerID(containerID) @@ -2035,6 +2036,10 @@ func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []s // make sure to close the stdout stream defer stdout.Close() + kubecontainer.HandleResizing(resize, func(size term.Size) { + term.SetSize(p.Fd(), size) + }) + if stdin != nil { go io.Copy(p, stdin) } diff --git a/pkg/kubelet/server/remotecommand/attach.go b/pkg/kubelet/server/remotecommand/attach.go index a7a12ea166..372ae54a21 100644 --- a/pkg/kubelet/server/remotecommand/attach.go +++ b/pkg/kubelet/server/remotecommand/attach.go @@ -25,13 +25,14 @@ import ( "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/term" ) // Attacher knows how to attach to a running container in a pod. type Attacher interface { // AttachContainer attaches to the running container in the pod, copying data between in/out/err // and the container's stdin/stdout/stderr. - AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error + AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error } // ServeAttach handles requests to attach to a container. After creating/receiving the required @@ -44,7 +45,7 @@ func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, po } defer ctx.conn.Close() - err := attacher.AttachContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty) + err := attacher.AttachContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan) if err != nil { msg := fmt.Sprintf("error attaching to container: %v", err) runtime.HandleError(errors.New(msg)) diff --git a/pkg/kubelet/server/remotecommand/contants.go b/pkg/kubelet/server/remotecommand/constants.go similarity index 77% rename from pkg/kubelet/server/remotecommand/contants.go rename to pkg/kubelet/server/remotecommand/constants.go index 1ecb072a11..33e0dce29a 100644 --- a/pkg/kubelet/server/remotecommand/contants.go +++ b/pkg/kubelet/server/remotecommand/constants.go @@ -31,6 +31,11 @@ const ( // attachment/execution. It is the second version of the subprotocol and // resolves the issues present in the first version. StreamProtocolV2Name = "v2.channel.k8s.io" + + // The SPDY subprotocol "v3.channel.k8s.io" is used for remote command + // attachment/execution. It is the third version of the subprotocol and + // adds support for resizing container terminals. + StreamProtocolV3Name = "v3.channel.k8s.io" ) -var SupportedStreamingProtocols = []string{StreamProtocolV2Name, StreamProtocolV1Name} +var SupportedStreamingProtocols = []string{StreamProtocolV3Name, StreamProtocolV2Name, StreamProtocolV1Name} diff --git a/pkg/kubelet/server/remotecommand/exec.go b/pkg/kubelet/server/remotecommand/exec.go index e77081f698..fcc4c230bf 100644 --- a/pkg/kubelet/server/remotecommand/exec.go +++ b/pkg/kubelet/server/remotecommand/exec.go @@ -26,13 +26,14 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/term" ) // Executor knows how to execute a command in a container in a pod. type Executor interface { // ExecInContainer executes a command in a container in the pod, copying data // between in/out/err and the container's stdin/stdout/stderr. - ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error + ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error } // ServeExec handles requests to execute a command in a container. After @@ -48,7 +49,7 @@ func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podN cmd := req.URL.Query()[api.ExecCommandParamm] - err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty) + err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan) if err != nil { msg := fmt.Sprintf("error executing command in container: %v", err) runtime.HandleError(errors.New(msg)) diff --git a/pkg/kubelet/server/remotecommand/httpstream.go b/pkg/kubelet/server/remotecommand/httpstream.go index f42b07e611..ccd0c3f007 100644 --- a/pkg/kubelet/server/remotecommand/httpstream.go +++ b/pkg/kubelet/server/remotecommand/httpstream.go @@ -17,6 +17,7 @@ limitations under the License. package remotecommand import ( + "encoding/json" "errors" "fmt" "io" @@ -27,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream/spdy" "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/util/wsstream" "github.com/golang/glog" @@ -87,6 +89,8 @@ type context struct { stdoutStream io.WriteCloser stderrStream io.WriteCloser errorStream io.WriteCloser + resizeStream io.ReadCloser + resizeChan chan term.Size tty bool } @@ -118,10 +122,26 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt return nil, false } + var ctx *context + var ok bool if wsstream.IsWebSocketRequest(req) { - return createWebSocketStreams(req, w, opts, idleTimeout) + ctx, ok = createWebSocketStreams(req, w, opts, idleTimeout) + } else { + ctx, ok = createHttpStreamStreams(req, w, opts, supportedStreamProtocols, idleTimeout, streamCreationTimeout) + } + if !ok { + return nil, false } + if ctx.resizeStream != nil { + ctx.resizeChan = make(chan term.Size) + go handleResizeEvents(ctx.resizeStream, ctx.resizeChan) + } + + return ctx, true +} + +func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) { protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols) if err != nil { w.WriteHeader(http.StatusBadRequest) @@ -148,6 +168,8 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt var handler protocolHandler switch protocol { + case StreamProtocolV3Name: + handler = &v3ProtocolHandler{} case StreamProtocolV2Name: handler = &v2ProtocolHandler{} case "": @@ -157,6 +179,10 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt handler = &v1ProtocolHandler{} } + if opts.tty && handler.supportsTerminalResizing() { + opts.expectedStreams++ + } + expired := time.NewTimer(streamCreationTimeout) ctx, err := handler.waitForStreams(streamCh, opts.expectedStreams, expired.C) @@ -167,6 +193,7 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt ctx.conn = conn ctx.tty = opts.tty + return ctx, true } @@ -174,8 +201,61 @@ type protocolHandler interface { // waitForStreams waits for the expected streams or a timeout, returning a // remoteCommandContext if all the streams were received, or an error if not. waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) + // supportsTerminalResizing returns true if the protocol handler supports terminal resizing + supportsTerminalResizing() bool } +// v3ProtocolHandler implements the V3 protocol version for streaming command execution. +type v3ProtocolHandler struct{} + +func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) { + ctx := &context{} + receivedStreams := 0 + replyChan := make(chan struct{}) + stop := make(chan struct{}) + defer close(stop) +WaitForStreams: + for { + select { + case stream := <-streams: + streamType := stream.Headers().Get(api.StreamType) + switch streamType { + case api.StreamTypeError: + ctx.errorStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + case api.StreamTypeStdin: + ctx.stdinStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + case api.StreamTypeStdout: + ctx.stdoutStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + case api.StreamTypeStderr: + ctx.stderrStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + case api.StreamTypeResize: + ctx.resizeStream = stream + go waitStreamReply(stream.replySent, replyChan, stop) + default: + runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType)) + } + case <-replyChan: + receivedStreams++ + if receivedStreams == expectedStreams { + break WaitForStreams + } + case <-expired: + // TODO find a way to return the error to the user. Maybe use a separate + // stream to report errors? + return nil, errors.New("timed out waiting for client to create streams") + } + } + + return ctx, nil +} + +// supportsTerminalResizing returns true because v3ProtocolHandler supports it +func (*v3ProtocolHandler) supportsTerminalResizing() bool { return true } + // v2ProtocolHandler implements the V2 protocol version for streaming command execution. type v2ProtocolHandler struct{} @@ -221,6 +301,9 @@ WaitForStreams: return ctx, nil } +// supportsTerminalResizing returns false because v2ProtocolHandler doesn't support it. +func (*v2ProtocolHandler) supportsTerminalResizing() bool { return false } + // v1ProtocolHandler implements the V1 protocol version for streaming command execution. type v1ProtocolHandler struct{} @@ -275,3 +358,19 @@ WaitForStreams: return ctx, nil } + +// supportsTerminalResizing returns false because v1ProtocolHandler doesn't support it. +func (*v1ProtocolHandler) supportsTerminalResizing() bool { return false } + +func handleResizeEvents(stream io.Reader, channel chan<- term.Size) { + defer runtime.HandleCrash() + + decoder := json.NewDecoder(stream) + for { + size := term.Size{} + if err := decoder.Decode(&size); err != nil { + break + } + channel <- size + } +} diff --git a/pkg/kubelet/server/remotecommand/websocket.go b/pkg/kubelet/server/remotecommand/websocket.go index d9a0665179..efeb1b6646 100644 --- a/pkg/kubelet/server/remotecommand/websocket.go +++ b/pkg/kubelet/server/remotecommand/websocket.go @@ -17,61 +17,82 @@ limitations under the License. package remotecommand import ( + "fmt" "net/http" "time" "k8s.io/kubernetes/pkg/httplog" + "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wsstream" - - "github.com/golang/glog" ) -// standardShellChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2) -// along with the approximate duplex value. Supported subprotocols are "channel.k8s.io" and -// "base64.channel.k8s.io". -func standardShellChannels(stdin, stdout, stderr bool) []wsstream.ChannelType { - // open three half-duplex channels - channels := []wsstream.ChannelType{wsstream.ReadChannel, wsstream.WriteChannel, wsstream.WriteChannel} - if !stdin { - channels[0] = wsstream.IgnoreChannel - } - if !stdout { - channels[1] = wsstream.IgnoreChannel - } - if !stderr { - channels[2] = wsstream.IgnoreChannel - } +const ( + stdinChannel = iota + stdoutChannel + stderrChannel + errorChannel + resizeChannel +) + +// createChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2) +// along with the approximate duplex value. It also creates the error (3) and resize (4) channels. +func createChannels(opts *options) []wsstream.ChannelType { + // open the requested channels, and always open the error channel + channels := make([]wsstream.ChannelType, 5) + channels[stdinChannel] = readChannel(opts.stdin) + channels[stdoutChannel] = writeChannel(opts.stdout) + channels[stderrChannel] = writeChannel(opts.stderr) + channels[errorChannel] = wsstream.WriteChannel + channels[resizeChannel] = wsstream.ReadChannel return channels } -// createWebSocketStreams returns a remoteCommandContext containing the websocket connection and +// readChannel returns wsstream.ReadChannel if real is true, or wsstream.IgnoreChannel. +func readChannel(real bool) wsstream.ChannelType { + if real { + return wsstream.ReadChannel + } + return wsstream.IgnoreChannel +} + +// writeChannel returns wsstream.WriteChannel if real is true, or wsstream.IgnoreChannel. +func writeChannel(real bool) wsstream.ChannelType { + if real { + return wsstream.WriteChannel + } + return wsstream.IgnoreChannel +} + +// createWebSocketStreams returns a context containing the websocket connection and // streams needed to perform an exec or an attach. func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *options, idleTimeout time.Duration) (*context, bool) { - // open the requested channels, and always open the error channel - channels := append(standardShellChannels(opts.stdin, opts.stdout, opts.stderr), wsstream.WriteChannel) + channels := createChannels(opts) conn := wsstream.NewConn(channels...) conn.SetIdleTimeout(idleTimeout) streams, err := conn.Open(httplog.Unlogged(w), req) if err != nil { - glog.Errorf("Unable to upgrade websocket connection: %v", err) + runtime.HandleError(fmt.Errorf("Unable to upgrade websocket connection: %v", err)) return nil, false } + // Send an empty message to the lowest writable channel to notify the client the connection is established // TODO: make generic to SPDY and WebSockets and do it outside of this method? switch { case opts.stdout: - streams[1].Write([]byte{}) + streams[stdoutChannel].Write([]byte{}) case opts.stderr: - streams[2].Write([]byte{}) + streams[stderrChannel].Write([]byte{}) default: - streams[3].Write([]byte{}) + streams[errorChannel].Write([]byte{}) } + return &context{ conn: conn, - stdinStream: streams[0], - stdoutStream: streams[1], - stderrStream: streams[2], - errorStream: streams[3], + stdinStream: streams[stdinChannel], + stdoutStream: streams[stdoutChannel], + stderrStream: streams[stderrChannel], + errorStream: streams[errorChannel], tty: opts.tty, + resizeStream: streams[resizeChannel], }, true } diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index bd24e94338..cfb3600b5b 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -58,6 +58,7 @@ import ( "k8s.io/kubernetes/pkg/util/httpstream/spdy" "k8s.io/kubernetes/pkg/util/limitwriter" utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/volume" ) @@ -160,8 +161,8 @@ type HostInterface interface { GetRunningPods() ([]*api.Pod, error) GetPodByName(namespace, name string) (*api.Pod, bool) RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error) - ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error - AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error + ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error + AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error GetKubeletContainerLogs(podFullName, containerName string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error ServeLogs(w http.ResponseWriter, req *http.Request) PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index 3d38931ac9..1bc04481f5 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -48,6 +48,7 @@ import ( "k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream/spdy" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/volume" ) @@ -119,11 +120,11 @@ func (fk *fakeKubelet) RunInContainer(podFullName string, uid types.UID, contain return fk.runFunc(podFullName, uid, containerName, cmd) } -func (fk *fakeKubelet) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error { +func (fk *fakeKubelet) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error { return fk.execFunc(name, uid, container, cmd, in, out, err, tty) } -func (fk *fakeKubelet) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error { +func (fk *fakeKubelet) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error { return fk.attachFunc(name, uid, container, in, out, err, tty) } diff --git a/pkg/util/term/resize.go b/pkg/util/term/resize.go new file mode 100644 index 0000000000..155c840d6a --- /dev/null +++ b/pkg/util/term/resize.go @@ -0,0 +1,147 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package term + +import ( + "fmt" + + "github.com/docker/docker/pkg/term" + "k8s.io/kubernetes/pkg/util/runtime" +) + +// Size represents the width and height of a terminal. +type Size struct { + Width uint16 + Height uint16 +} + +// GetSize returns the current size of the user's terminal. If it isn't a terminal, +// nil is returned. +func (t TTY) GetSize() *Size { + if !t.IsTerminalOut() { + return nil + } + return GetSize(t.Out.(fd).Fd()) +} + +// GetSize returns the current size of the terminal associated with fd. +func GetSize(fd uintptr) *Size { + winsize, err := term.GetWinsize(fd) + if err != nil { + runtime.HandleError(fmt.Errorf("unable to get terminal size: %v", err)) + return nil + } + + return &Size{Width: winsize.Width, Height: winsize.Height} +} + +// SetSize sets the terminal size associated with fd. +func SetSize(fd uintptr, size Size) error { + return term.SetWinsize(fd, &term.Winsize{Height: size.Height, Width: size.Width}) +} + +// MonitorSize monitors the terminal's size. It returns a TerminalSizeQueue primed with +// initialSizes, or nil if there's no TTY present. +func (t *TTY) MonitorSize(initialSizes ...*Size) TerminalSizeQueue { + if !t.IsTerminalOut() { + return nil + } + + t.sizeQueue = &sizeQueue{ + t: *t, + // make it buffered so we can send the initial terminal sizes without blocking, prior to starting + // the streaming below + resizeChan: make(chan Size, len(initialSizes)), + stopResizing: make(chan struct{}), + } + + t.sizeQueue.monitorSize(initialSizes...) + + return t.sizeQueue +} + +// TerminalSizeQueue is capable of returning terminal resize events as they occur. +type TerminalSizeQueue interface { + // Next returns the new terminal size after the terminal has been resized. It returns nil when + // monitoring has been stopped. + Next() *Size +} + +// sizeQueue implements TerminalSizeQueue +type sizeQueue struct { + t TTY + // resizeChan receives a Size each time the user's terminal is resized. + resizeChan chan Size + stopResizing chan struct{} +} + +// make sure sizeQueue implements the TerminalSizeQueue interface +var _ TerminalSizeQueue = &sizeQueue{} + +// monitorSize primes resizeChan with initialSizes and then monitors for resize events. With each +// new event, it sends the current terminal size to resizeChan. +func (s *sizeQueue) monitorSize(initialSizes ...*Size) { + // send the initial sizes + for i := range initialSizes { + if initialSizes[i] != nil { + s.resizeChan <- *initialSizes[i] + } + } + + resizeEvents := make(chan Size, 1) + + monitorResizeEvents(s.t.Out.(fd).Fd(), resizeEvents, s.stopResizing) + + // listen for resize events in the background + go func() { + defer runtime.HandleCrash() + + for { + select { + case size, ok := <-resizeEvents: + if !ok { + return + } + + select { + // try to send the size to resizeChan, but don't block + case s.resizeChan <- size: + // send successful + default: + // unable to send / no-op + } + case <-s.stopResizing: + return + } + } + }() +} + +// Next returns the new terminal size after the terminal has been resized. It returns nil when +// monitoring has been stopped. +func (s *sizeQueue) Next() *Size { + size, ok := <-s.resizeChan + if !ok { + return nil + } + return &size +} + +// stop stops the background goroutine that is monitoring for terminal resizes. +func (s *sizeQueue) stop() { + close(s.stopResizing) +} diff --git a/pkg/util/term/resizeevents.go b/pkg/util/term/resizeevents.go new file mode 100644 index 0000000000..70858ed03f --- /dev/null +++ b/pkg/util/term/resizeevents.go @@ -0,0 +1,60 @@ +// +build !windows + +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package term + +import ( + "os" + "os/signal" + "syscall" + + "k8s.io/kubernetes/pkg/util/runtime" +) + +// monitorResizeEvents spawns a goroutine that waits for SIGWINCH signals (these indicate the +// terminal has resized). After receiving a SIGWINCH, this gets the terminal size and tries to send +// it to the resizeEvents channel. The goroutine stops when the stop channel is closed. +func monitorResizeEvents(fd uintptr, resizeEvents chan<- Size, stop chan struct{}) { + go func() { + defer runtime.HandleCrash() + + winch := make(chan os.Signal, 1) + signal.Notify(winch, syscall.SIGWINCH) + defer signal.Stop(winch) + + for { + select { + case <-winch: + size := GetSize(fd) + if size == nil { + return + } + + // try to send size + select { + case resizeEvents <- *size: + // success + default: + // not sent + } + case <-stop: + return + } + } + }() +} diff --git a/pkg/util/term/resizeevents_windows.go b/pkg/util/term/resizeevents_windows.go new file mode 100644 index 0000000000..aa6de728b6 --- /dev/null +++ b/pkg/util/term/resizeevents_windows.go @@ -0,0 +1,57 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package term + +import ( + "time" + + "k8s.io/kubernetes/pkg/util/runtime" +) + +// monitorResizeEvents spawns a goroutine that periodically gets the terminal size and tries to send +// it to the resizeEvents channel if the size has changed. The goroutine stops when the stop channel +// is closed. +func monitorResizeEvents(fd uintptr, resizeEvents chan<- Size, stop chan struct{}) { + go func() { + defer runtime.HandleCrash() + + var lastSize Size + + for { + // see if we need to stop running + select { + case <-stop: + return + default: + } + + size := GetSize(fd) + if size == nil { + return + } + + if size.Height != lastSize.Height || size.Width != lastSize.Width { + lastSize.Height = size.Height + lastSize.Width = size.Width + resizeEvents <- *size + } + + // sleep to avoid hot looping + time.Sleep(250 * time.Millisecond) + } + }() +} diff --git a/pkg/util/term/term.go b/pkg/util/term/term.go index 68f99774df..14c7c3340a 100644 --- a/pkg/util/term/term.go +++ b/pkg/util/term/term.go @@ -21,17 +21,22 @@ import ( "os" "github.com/docker/docker/pkg/term" + "k8s.io/kubernetes/pkg/util/interrupt" ) // SafeFunc is a function to be invoked by TTY. type SafeFunc func() error -// TTY helps invoke a function and preserve the state of the terminal, even if the -// process is terminated during execution. +// TTY helps invoke a function and preserve the state of the terminal, even if the process is +// terminated during execution. It also provides support for terminal resizing for remote command +// execution/attachment. type TTY struct { - // In is a reader to check for a terminal. + // In is a reader representing stdin. It is a required field. In io.Reader + // Out is a writer representing stdout. It must be set to support terminal resizing. It is an + // optional field. + Out io.Writer // Raw is true if the terminal should be set raw. Raw bool // TryDev indicates the TTY should try to open /dev/tty if the provided input @@ -41,6 +46,10 @@ type TTY struct { // it will be invoked after the terminal state is restored. If it is not provided, // a signal received during the TTY will result in os.Exit(0) being invoked. Parent *interrupt.Handler + + // sizeQueue is set after a call to MonitorSize() and is used to monitor SIGWINCH signals when the + // user's terminal resizes. + sizeQueue *sizeQueue } // fd returns a file descriptor for a given object. @@ -48,10 +57,22 @@ type fd interface { Fd() uintptr } -// IsTerminal returns true if the provided input is a terminal. Does not check /dev/tty +// IsTerminalIn returns true if t.In is a terminal. Does not check /dev/tty // even if TryDev is set. -func (t TTY) IsTerminal() bool { - return IsTerminal(t.In) +func (t TTY) IsTerminalIn() bool { + return isTerminal(t.In) +} + +// IsTerminalOut returns true if t.Out is a terminal. Does not check /dev/tty +// even if TryDev is set. +func (t TTY) IsTerminalOut() bool { + return isTerminal(t.Out) +} + +// isTerminal returns whether the passed object is a terminal or not +func isTerminal(i interface{}) bool { + file, ok := i.(fd) + return ok && term.IsTerminal(file.Fd()) } // Safe invokes the provided function and will attempt to ensure that when the @@ -90,11 +111,11 @@ func (t TTY) Safe(fn SafeFunc) error { if err != nil { return err } - return interrupt.Chain(t.Parent, func() { term.RestoreTerminal(inFd, state) }).Run(fn) -} + return interrupt.Chain(t.Parent, func() { + if t.sizeQueue != nil { + t.sizeQueue.stop() + } -// IsTerminal returns whether the passed io.Reader is a terminal or not -func IsTerminal(r io.Reader) bool { - file, ok := r.(fd) - return ok && term.IsTerminal(file.Fd()) + term.RestoreTerminal(inFd, state) + }).Run(fn) } diff --git a/test/e2e_node/exec_util.go b/test/e2e_node/exec_util.go index 31afc64a68..1986091cda 100644 --- a/test/e2e_node/exec_util.go +++ b/test/e2e_node/exec_util.go @@ -34,7 +34,13 @@ func execute(method string, url *url.URL, config *restclient.Config, stdin io.Re if err != nil { return err } - return exec.Stream(remotecommandserver.SupportedStreamingProtocols, stdin, stdout, stderr, tty) + return exec.Stream(remotecommand.StreamOptions{ + SupportedProtocols: remotecommandserver.SupportedStreamingProtocols, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, + }) } func execCommandInContainer(config *restclient.Config, c *client.Client, ns, podName, containerName string, cmd []string) (string, error) {