Support terminal resizing for exec/attach/run

Add support for terminal resizing for exec, attach, and run. Note that for Docker, exec sessions
inherit the environment from the primary process, so if the container was created with tty=false,
that means the exec session's TERM variable will default to "dumb". Users can override this by
setting TERM=xterm (or whatever is appropriate) to get the correct "smart" terminal behavior.
pull/6/head
Andy Goldstein 2016-04-18 12:54:44 -04:00
parent ec6181d5d3
commit 3b21a9901b
41 changed files with 1247 additions and 247 deletions

View File

@ -17,14 +17,15 @@ limitations under the License.
package main
import (
"os"
"github.com/docker/docker/pkg/term"
"k8s.io/kubernetes/pkg/kubectl/cmd"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
)
func NewKubectlServer() *Server {
cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), os.Stdin, os.Stdout, os.Stderr)
// need to use term.StdStreams to get the right IO refs on Windows
stdin, stdout, stderr := term.StdStreams()
cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), stdin, stdout, stderr)
localFlags := cmd.LocalFlags()
localFlags.SetInterspersed(false)

View File

@ -17,7 +17,7 @@ limitations under the License.
package app
import (
"os"
"github.com/docker/docker/pkg/term"
"k8s.io/kubernetes/pkg/kubectl/cmd"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
@ -28,6 +28,8 @@ WARNING: this logic is duplicated, with minor changes, in cmd/hyperkube/kubectl.
Any salient changes here will need to be manually reflected in that file.
*/
func Run() error {
cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), os.Stdin, os.Stdout, os.Stderr)
// need to use term.StdStreams to get the right IO refs on Windows
stdin, stdout, stderr := term.StdStreams()
cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), stdin, stdout, stderr)
return cmd.Execute()
}

View File

@ -2751,6 +2751,8 @@ const (
StreamTypeData = "data"
// Value for streamType header for error stream
StreamTypeError = "error"
// Value for streamType header for terminal resize stream
StreamTypeResize = "resize"
// Name of header that specifies the port being forwarded
PortHeader = "port"

View File

@ -29,15 +29,28 @@ import (
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/httpstream/spdy"
"k8s.io/kubernetes/pkg/util/term"
)
// StreamOptions holds information pertaining to the current streaming session: supported stream
// protocols, input/output streams, if the client is requesting a TTY, and a terminal size queue to
// support terminal resizing.
type StreamOptions struct {
SupportedProtocols []string
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
Tty bool
TerminalSizeQueue term.TerminalSizeQueue
}
// Executor is an interface for transporting shell-style streams.
type Executor interface {
// Stream initiates the transport of the standard shell streams. It will transport any
// non-nil stream to a remote system, and return an error if a problem occurs. If tty
// is set, the stderr stream is not used (raw TTY manages stdout and stderr over the
// stdout stream).
Stream(supportedProtocols []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
Stream(options StreamOptions) error
}
// StreamExecutor supports the ability to dial an httpstream connection and the ability to
@ -129,14 +142,18 @@ func (e *streamExecutor) Dial(protocols ...string) (httpstream.Connection, strin
return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil
}
type streamCreator interface {
CreateStream(headers http.Header) (httpstream.Stream, error)
}
type streamProtocolHandler interface {
stream(httpstream.Connection) error
stream(conn streamCreator) error
}
// Stream opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects.
func (e *streamExecutor) Stream(supportedProtocols []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
conn, protocol, err := e.Dial(supportedProtocols...)
func (e *streamExecutor) Stream(options StreamOptions) error {
conn, protocol, err := e.Dial(options.SupportedProtocols...)
if err != nil {
return err
}
@ -145,23 +162,15 @@ func (e *streamExecutor) Stream(supportedProtocols []string, stdin io.Reader, st
var streamer streamProtocolHandler
switch protocol {
case remotecommand.StreamProtocolV3Name:
streamer = newStreamProtocolV3(options)
case remotecommand.StreamProtocolV2Name:
streamer = &streamProtocolV2{
stdin: stdin,
stdout: stdout,
stderr: stderr,
tty: tty,
}
streamer = newStreamProtocolV2(options)
case "":
glog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
fallthrough
case remotecommand.StreamProtocolV1Name:
streamer = &streamProtocolV1{
stdin: stdin,
stdout: stdout,
stderr: stderr,
tty: tty,
}
streamer = newStreamProtocolV1(options)
}
return streamer.stream(conn)

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/term"
)
type fakeExecutor struct {
@ -52,11 +53,11 @@ type fakeExecutor struct {
exec bool
}
func (ex *fakeExecutor) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
func (ex *fakeExecutor) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
return ex.run(name, uid, container, cmd, in, out, err, tty)
}
func (ex *fakeExecutor) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error {
func (ex *fakeExecutor) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
return ex.run(name, uid, container, nil, in, out, err, tty)
}
@ -253,7 +254,13 @@ func TestStream(t *testing.T) {
t.Errorf("%s: unexpected error: %v", name, err)
continue
}
err = e.Stream(testCase.ClientProtocols, streamIn, streamOut, streamErr, testCase.Tty)
err = e.Stream(StreamOptions{
SupportedProtocols: testCase.ClientProtocols,
Stdin: streamIn,
Stdout: streamOut,
Stderr: streamErr,
Tty: testCase.Tty,
})
hasErr := err != nil
if len(testCase.Error) > 0 {
@ -277,13 +284,13 @@ func TestStream(t *testing.T) {
if len(testCase.Stdout) > 0 {
if e, a := strings.Repeat(testCase.Stdout, testCase.MessageCount), localOut; e != a.String() {
t.Errorf("%s: expected stdout data '%s', got '%s'", name, e, a)
t.Errorf("%s: expected stdout data %q, got %q", name, e, a)
}
}
if testCase.Stderr != "" {
if e, a := strings.Repeat(testCase.Stderr, testCase.MessageCount), localErr; e != a.String() {
t.Errorf("%s: expected stderr data '%s', got '%s'", name, e, a)
t.Errorf("%s: expected stderr data %q, got %q", name, e, a)
}
}

View File

@ -28,19 +28,27 @@ import (
)
// streamProtocolV1 implements the first version of the streaming exec & attach
// protocol. This version has some bugs, such as not being able to detecte when
// protocol. This version has some bugs, such as not being able to detect when
// non-interactive stdin data has ended. See http://issues.k8s.io/13394 and
// http://issues.k8s.io/13395 for more details.
type streamProtocolV1 struct {
stdin io.Reader
stdout io.Writer
stderr io.Writer
tty bool
StreamOptions
errorStream httpstream.Stream
remoteStdin httpstream.Stream
remoteStdout httpstream.Stream
remoteStderr httpstream.Stream
}
var _ streamProtocolHandler = &streamProtocolV1{}
func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
func newStreamProtocolV1(options StreamOptions) streamProtocolHandler {
return &streamProtocolV1{
StreamOptions: options,
}
}
func (p *streamProtocolV1) stream(conn streamCreator) error {
doneChan := make(chan struct{}, 2)
errorChan := make(chan error)
@ -55,19 +63,15 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
}
}
var (
err error
errorStream, remoteStdin, remoteStdout, remoteStderr httpstream.Stream
)
// set up all the streams first
var err error
headers := http.Header{}
headers.Set(api.StreamType, api.StreamTypeError)
errorStream, err = conn.CreateStream(headers)
p.errorStream, err = conn.CreateStream(headers)
if err != nil {
return err
}
defer errorStream.Reset()
defer p.errorStream.Reset()
// Create all the streams first, then start the copy goroutines. The server doesn't start its copy
// goroutines until it's received all of the streams. If the client creates the stdin stream and
@ -76,38 +80,38 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
// getting processed because the server hasn't started its copying, and it won't do that until it
// gets all the streams. By creating all the streams first, we ensure that the server is ready to
// process data before the client starts sending any. See https://issues.k8s.io/16373 for more info.
if e.stdin != nil {
if p.Stdin != nil {
headers.Set(api.StreamType, api.StreamTypeStdin)
remoteStdin, err = conn.CreateStream(headers)
p.remoteStdin, err = conn.CreateStream(headers)
if err != nil {
return err
}
defer remoteStdin.Reset()
defer p.remoteStdin.Reset()
}
if e.stdout != nil {
if p.Stdout != nil {
headers.Set(api.StreamType, api.StreamTypeStdout)
remoteStdout, err = conn.CreateStream(headers)
p.remoteStdout, err = conn.CreateStream(headers)
if err != nil {
return err
}
defer remoteStdout.Reset()
defer p.remoteStdout.Reset()
}
if e.stderr != nil && !e.tty {
if p.Stderr != nil && !p.Tty {
headers.Set(api.StreamType, api.StreamTypeStderr)
remoteStderr, err = conn.CreateStream(headers)
p.remoteStderr, err = conn.CreateStream(headers)
if err != nil {
return err
}
defer remoteStderr.Reset()
defer p.remoteStderr.Reset()
}
// now that all the streams have been created, proceed with reading & copying
// always read from errorStream
go func() {
message, err := ioutil.ReadAll(errorStream)
message, err := ioutil.ReadAll(p.errorStream)
if err != nil && err != io.EOF {
errorChan <- fmt.Errorf("Error reading from error stream: %s", err)
return
@ -118,25 +122,25 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
}
}()
if e.stdin != nil {
if p.Stdin != nil {
// TODO this goroutine will never exit cleanly (the io.Copy never unblocks)
// because stdin is not closed until the process exits. If we try to call
// stdin.Close(), it returns no error but doesn't unblock the copy. It will
// exit when the process exits, instead.
go cp(api.StreamTypeStdin, remoteStdin, e.stdin)
go cp(api.StreamTypeStdin, p.remoteStdin, p.Stdin)
}
waitCount := 0
completedStreams := 0
if e.stdout != nil {
if p.Stdout != nil {
waitCount++
go cp(api.StreamTypeStdout, e.stdout, remoteStdout)
go cp(api.StreamTypeStdout, p.Stdout, p.remoteStdout)
}
if e.stderr != nil && !e.tty {
if p.Stderr != nil && !p.Tty {
waitCount++
go cp(api.StreamTypeStderr, e.stderr, remoteStderr)
go cp(api.StreamTypeStderr, p.Stderr, p.remoteStderr)
}
Loop:

View File

@ -24,7 +24,6 @@ import (
"sync"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/runtime"
)
@ -33,63 +32,69 @@ import (
// version is referred to as version 2, even though it is the first actual
// numbered version.
type streamProtocolV2 struct {
stdin io.Reader
stdout io.Writer
stderr io.Writer
tty bool
StreamOptions
errorStream io.Reader
remoteStdin io.ReadWriteCloser
remoteStdout io.Reader
remoteStderr io.Reader
}
var _ streamProtocolHandler = &streamProtocolV2{}
func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
var (
err error
errorStream, remoteStdin, remoteStdout, remoteStderr httpstream.Stream
)
func newStreamProtocolV2(options StreamOptions) streamProtocolHandler {
return &streamProtocolV2{
StreamOptions: options,
}
}
func (p *streamProtocolV2) createStreams(conn streamCreator) error {
var err error
headers := http.Header{}
// set up all the streams first
// set up error stream
errorChan := make(chan error)
headers.Set(api.StreamType, api.StreamTypeError)
errorStream, err = conn.CreateStream(headers)
p.errorStream, err = conn.CreateStream(headers)
if err != nil {
return err
}
// set up stdin stream
if e.stdin != nil {
if p.Stdin != nil {
headers.Set(api.StreamType, api.StreamTypeStdin)
remoteStdin, err = conn.CreateStream(headers)
p.remoteStdin, err = conn.CreateStream(headers)
if err != nil {
return err
}
}
// set up stdout stream
if e.stdout != nil {
if p.Stdout != nil {
headers.Set(api.StreamType, api.StreamTypeStdout)
remoteStdout, err = conn.CreateStream(headers)
p.remoteStdout, err = conn.CreateStream(headers)
if err != nil {
return err
}
}
// set up stderr stream
if e.stderr != nil && !e.tty {
if p.Stderr != nil && !p.Tty {
headers.Set(api.StreamType, api.StreamTypeStderr)
remoteStderr, err = conn.CreateStream(headers)
p.remoteStderr, err = conn.CreateStream(headers)
if err != nil {
return err
}
}
return nil
}
// now that all the streams have been created, proceed with reading & copying
func (p *streamProtocolV2) setupErrorStreamReading() chan error {
errorChan := make(chan error)
// always read from errorStream
go func() {
message, err := ioutil.ReadAll(errorStream)
defer runtime.HandleCrash()
message, err := ioutil.ReadAll(p.errorStream)
switch {
case err != nil && err != io.EOF:
errorChan <- fmt.Errorf("error reading from error stream: %s", err)
@ -101,18 +106,23 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
close(errorChan)
}()
var wg sync.WaitGroup
var once sync.Once
return errorChan
}
func (p *streamProtocolV2) copyStdin() {
if p.Stdin != nil {
var once sync.Once
if e.stdin != nil {
// copy from client's stdin to container's stdin
go func() {
// if e.stdin is noninteractive, e.g. `echo abc | kubectl exec -i <pod> -- cat`, make sure
// we close remoteStdin as soon as the copy from e.stdin to remoteStdin finishes. Otherwise
// the executed command will remain running.
defer once.Do(func() { remoteStdin.Close() })
defer runtime.HandleCrash()
if _, err := io.Copy(remoteStdin, e.stdin); err != nil {
// if p.stdin is noninteractive, p.g. `echo abc | kubectl exec -i <pod> -- cat`, make sure
// we close remoteStdin as soon as the copy from p.stdin to remoteStdin finishes. Otherwise
// the executed command will remain running.
defer once.Do(func() { p.remoteStdin.Close() })
if _, err := io.Copy(p.remoteStdin, p.Stdin); err != nil {
runtime.HandleError(err)
}
}()
@ -121,6 +131,9 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
// be able to exit interactive sessions cleanly and not leak goroutines or
// hang the client's terminal.
//
// TODO we aren't using go-dockerclient any more; revisit this to determine if it's still
// required by engine-api.
//
// go-dockerclient's current hijack implementation
// (https://github.com/fsouza/go-dockerclient/blob/89f3d56d93788dfe85f864a44f85d9738fca0670/client.go#L564)
// waits for all three streams (stdin/stdout/stderr) to finish copying
@ -129,35 +142,65 @@ func (e *streamProtocolV2) stream(conn httpstream.Connection) error {
// When that happens, we must Close() on our side of remoteStdin, to
// allow the copy in hijack to complete, and hijack to return.
go func() {
defer once.Do(func() { remoteStdin.Close() })
defer runtime.HandleCrash()
defer once.Do(func() { p.remoteStdin.Close() })
// this "copy" doesn't actually read anything - it's just here to wait for
// the server to close remoteStdin.
if _, err := io.Copy(ioutil.Discard, remoteStdin); err != nil {
if _, err := io.Copy(ioutil.Discard, p.remoteStdin); err != nil {
runtime.HandleError(err)
}
}()
}
}
func (p *streamProtocolV2) copyStdout(wg *sync.WaitGroup) {
if p.Stdout == nil {
return
}
if e.stdout != nil {
wg.Add(1)
go func() {
defer wg.Done()
if _, err := io.Copy(e.stdout, remoteStdout); err != nil {
runtime.HandleError(err)
}
}()
wg.Add(1)
go func() {
defer runtime.HandleCrash()
defer wg.Done()
if _, err := io.Copy(p.Stdout, p.remoteStdout); err != nil {
runtime.HandleError(err)
}
}()
}
func (p *streamProtocolV2) copyStderr(wg *sync.WaitGroup) {
if p.Stderr == nil || p.Tty {
return
}
if e.stderr != nil && !e.tty {
wg.Add(1)
go func() {
defer wg.Done()
if _, err := io.Copy(e.stderr, remoteStderr); err != nil {
runtime.HandleError(err)
}
}()
wg.Add(1)
go func() {
defer runtime.HandleCrash()
defer wg.Done()
if _, err := io.Copy(p.Stderr, p.remoteStderr); err != nil {
runtime.HandleError(err)
}
}()
}
func (p *streamProtocolV2) stream(conn streamCreator) error {
if err := p.createStreams(conn); err != nil {
return err
}
// now that all the streams have been created, proceed with reading & copying
errorChan := p.setupErrorStreamReading()
p.copyStdin()
var wg sync.WaitGroup
p.copyStdout(&wg)
p.copyStderr(&wg)
// we're waiting for stdout/stderr to finish copying
wg.Wait()

View File

@ -0,0 +1,228 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
import (
"errors"
"io"
"net/http"
"strings"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/wait"
)
type fakeReader struct {
err error
}
func (r *fakeReader) Read([]byte) (int, error) { return 0, r.err }
type fakeWriter struct{}
func (*fakeWriter) Write([]byte) (int, error) { return 0, nil }
type fakeStreamCreator struct {
created map[string]bool
errors map[string]error
}
var _ streamCreator = &fakeStreamCreator{}
func (f *fakeStreamCreator) CreateStream(headers http.Header) (httpstream.Stream, error) {
streamType := headers.Get(api.StreamType)
f.created[streamType] = true
return nil, f.errors[streamType]
}
func TestV2CreateStreams(t *testing.T) {
tests := []struct {
name string
stdin bool
stdinError error
stdout bool
stdoutError error
stderr bool
stderrError error
errorError error
tty bool
expectError bool
}{
{
name: "stdin error",
stdin: true,
stdinError: errors.New("stdin error"),
expectError: true,
},
{
name: "stdout error",
stdout: true,
stdoutError: errors.New("stdout error"),
expectError: true,
},
{
name: "stderr error",
stderr: true,
stderrError: errors.New("stderr error"),
expectError: true,
},
{
name: "error stream error",
stdin: true,
stdout: true,
stderr: true,
errorError: errors.New("error stream error"),
expectError: true,
},
{
name: "no errors",
stdin: true,
stdout: true,
stderr: true,
expectError: false,
},
{
name: "no errors, stderr & tty set, don't expect stderr",
stdin: true,
stdout: true,
stderr: true,
tty: true,
expectError: false,
},
}
for _, test := range tests {
conn := &fakeStreamCreator{
created: make(map[string]bool),
errors: map[string]error{
api.StreamTypeStdin: test.stdinError,
api.StreamTypeStdout: test.stdoutError,
api.StreamTypeStderr: test.stderrError,
api.StreamTypeError: test.errorError,
},
}
opts := StreamOptions{Tty: test.tty}
if test.stdin {
opts.Stdin = &fakeReader{}
}
if test.stdout {
opts.Stdout = &fakeWriter{}
}
if test.stderr {
opts.Stderr = &fakeWriter{}
}
h := newStreamProtocolV2(opts).(*streamProtocolV2)
err := h.createStreams(conn)
if test.expectError {
if err == nil {
t.Errorf("%s: expected error", test.name)
continue
}
if e, a := test.stdinError, err; test.stdinError != nil && e != a {
t.Errorf("%s: expected %v, got %v", test.name, e, a)
}
if e, a := test.stdoutError, err; test.stdoutError != nil && e != a {
t.Errorf("%s: expected %v, got %v", test.name, e, a)
}
if e, a := test.stderrError, err; test.stderrError != nil && e != a {
t.Errorf("%s: expected %v, got %v", test.name, e, a)
}
if e, a := test.errorError, err; test.errorError != nil && e != a {
t.Errorf("%s: expected %v, got %v", test.name, e, a)
}
continue
}
if !test.expectError && err != nil {
t.Errorf("%s: unexpected error: %v", test.name, err)
continue
}
if test.stdin && !conn.created[api.StreamTypeStdin] {
t.Errorf("%s: expected stdin stream", test.name)
}
if test.stdout && !conn.created[api.StreamTypeStdout] {
t.Errorf("%s: expected stdout stream", test.name)
}
if test.stderr {
if test.tty && conn.created[api.StreamTypeStderr] {
t.Errorf("%s: unexpected stderr stream because tty is set", test.name)
} else if !test.tty && !conn.created[api.StreamTypeStderr] {
t.Errorf("%s: expected stderr stream", test.name)
}
}
if !conn.created[api.StreamTypeError] {
t.Errorf("%s: expected error stream", test.name)
}
}
}
func TestV2ErrorStreamReading(t *testing.T) {
tests := []struct {
name string
stream io.Reader
expectedError error
}{
{
name: "error reading from stream",
stream: &fakeReader{errors.New("foo")},
expectedError: errors.New("error reading from error stream: foo"),
},
{
name: "stream returns an error",
stream: strings.NewReader("some error"),
expectedError: errors.New("error executing remote command: some error"),
},
}
for _, test := range tests {
h := newStreamProtocolV2(StreamOptions{}).(*streamProtocolV2)
h.errorStream = test.stream
ch := h.setupErrorStreamReading()
if ch == nil {
t.Fatalf("%s: unexpected nil channel", test.name)
}
var err error
select {
case err = <-ch:
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("%s: timed out", test.name)
}
if test.expectedError != nil {
if err == nil {
t.Errorf("%s: expected an error", test.name)
} else if e, a := test.expectedError, err; e.Error() != a.Error() {
t.Errorf("%s: expected %q, got %q", test.name, e, a)
}
continue
}
if test.expectedError == nil && err != nil {
t.Errorf("%s: unexpected error: %v", test.name, err)
continue
}
}
}

View File

@ -0,0 +1,108 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
import (
"encoding/json"
"io"
"net/http"
"sync"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/runtime"
)
// streamProtocolV3 implements version 3 of the streaming protocol for attach
// and exec. This version adds support for resizing the container's terminal.
type streamProtocolV3 struct {
*streamProtocolV2
resizeStream io.Writer
}
var _ streamProtocolHandler = &streamProtocolV3{}
func newStreamProtocolV3(options StreamOptions) streamProtocolHandler {
return &streamProtocolV3{
streamProtocolV2: newStreamProtocolV2(options).(*streamProtocolV2),
}
}
func (p *streamProtocolV3) createStreams(conn streamCreator) error {
// set up the streams from v2
if err := p.streamProtocolV2.createStreams(conn); err != nil {
return err
}
// set up resize stream
if p.Tty {
headers := http.Header{}
headers.Set(api.StreamType, api.StreamTypeResize)
var err error
p.resizeStream, err = conn.CreateStream(headers)
if err != nil {
return err
}
}
return nil
}
func (p *streamProtocolV3) handleResizes() {
if p.resizeStream == nil {
return
}
go func() {
defer runtime.HandleCrash()
encoder := json.NewEncoder(p.resizeStream)
for {
size := p.TerminalSizeQueue.Next()
if size == nil {
return
}
if err := encoder.Encode(&size); err != nil {
runtime.HandleError(err)
}
}
}()
}
func (p *streamProtocolV3) stream(conn streamCreator) error {
if err := p.createStreams(conn); err != nil {
return err
}
// now that all the streams have been created, proceed with reading & copying
errorChan := p.setupErrorStreamReading()
p.handleResizes()
p.copyStdin()
var wg sync.WaitGroup
p.copyStdout(&wg)
p.copyStderr(&wg)
// we're waiting for stdout/stderr to finish copying
wg.Wait()
// waits for errorStream to finish reading with an error or nil
return <-errorChan
}

View File

@ -77,18 +77,25 @@ func NewCmdAttach(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer)
// RemoteAttach defines the interface accepted by the Attach command - provided for test stubbing
type RemoteAttach interface {
Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error
}
// DefaultRemoteAttach is the standard implementation of attaching
type DefaultRemoteAttach struct{}
func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error {
exec, err := remotecommand.NewExecutor(config, method, url)
if err != nil {
return err
}
return exec.Stream(remotecommandserver.SupportedStreamingProtocols, stdin, stdout, stderr, tty)
return exec.Stream(remotecommand.StreamOptions{
SupportedProtocols: remotecommandserver.SupportedStreamingProtocols,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
TerminalSizeQueue: terminalSizeQueue,
})
}
// AttachOptions declare the arguments accepted by the Exec command
@ -182,7 +189,10 @@ func (p *AttachOptions) Run() error {
pod := p.Pod
// ensure we can recover the terminal while attached
t := term.TTY{Parent: p.InterruptParent}
t := term.TTY{
Parent: p.InterruptParent,
Out: p.Out,
}
// check for TTY
tty := p.TTY
@ -196,17 +206,41 @@ func (p *AttachOptions) Run() error {
}
if p.Stdin {
t.In = p.In
if tty && !t.IsTerminal() {
if tty && !t.IsTerminalIn() {
tty = false
fmt.Fprintln(p.Err, "Unable to use a TTY - input is not a terminal or the right kind of file")
}
} else {
p.In = nil
}
t.Raw = tty
fn := func() error {
if tty {
fmt.Fprintln(p.Out, "\nHit enter for command prompt")
// save p.Err so we can print the command prompt message below
stderr := p.Err
var sizeQueue term.TerminalSizeQueue
if tty {
if size := t.GetSize(); size != nil {
// fake resizing +1 and then back to normal so that attach-detach-reattach will result in the
// screen being redrawn
sizePlusOne := *size
sizePlusOne.Width++
sizePlusOne.Height++
// this call spawns a goroutine to monitor/update the terminal size
sizeQueue = t.MonitorSize(&sizePlusOne, size)
}
// unset p.Err if it was previously set because both stdout and stderr go over p.Out when tty is
// true
p.Err = nil
}
fn := func() error {
if stderr != nil {
fmt.Fprintln(stderr, "If you don't see a command prompt, try pressing enter.")
}
// TODO: consider abstracting into a client invocation or client helper
req := p.Client.RESTClient.Post().
Resource("pods").
@ -215,13 +249,13 @@ func (p *AttachOptions) Run() error {
SubResource("attach")
req.VersionedParams(&api.PodAttachOptions{
Container: containerToAttach.Name,
Stdin: p.In != nil,
Stdin: p.Stdin,
Stdout: p.Out != nil,
Stderr: p.Err != nil,
TTY: tty,
}, api.ParameterCodec)
return p.Attach.Attach("POST", req.URL(), p.Config, p.In, p.Out, p.Err, tty)
return p.Attach.Attach("POST", req.URL(), p.Config, p.In, p.Out, p.Err, tty, sizeQueue)
}
if err := t.Safe(fn); err != nil {

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/fake"
"k8s.io/kubernetes/pkg/util/term"
)
type fakeRemoteAttach struct {
@ -40,7 +41,7 @@ type fakeRemoteAttach struct {
err error
}
func (f *fakeRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
func (f *fakeRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error {
f.method = method
f.url = url
return f.err

View File

@ -20,11 +20,7 @@ import (
"fmt"
"io"
"net/url"
"os"
"os/signal"
"syscall"
"github.com/docker/docker/pkg/term"
"github.com/golang/glog"
"github.com/renstrom/dedent"
"github.com/spf13/cobra"
@ -34,6 +30,8 @@ import (
"k8s.io/kubernetes/pkg/client/unversioned/remotecommand"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/util/interrupt"
"k8s.io/kubernetes/pkg/util/term"
)
var (
@ -79,18 +77,25 @@ func NewCmdExec(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer) *
// RemoteExecutor defines the interface accepted by the Exec command - provided for test stubbing
type RemoteExecutor interface {
Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error
}
// DefaultRemoteExecutor is the standard implementation of remote command execution
type DefaultRemoteExecutor struct{}
func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error {
exec, err := remotecommand.NewExecutor(config, method, url)
if err != nil {
return err
}
return exec.Stream(remotecommandserver.SupportedStreamingProtocols, stdin, stdout, stderr, tty)
return exec.Stream(remotecommand.StreamOptions{
SupportedProtocols: remotecommandserver.SupportedStreamingProtocols,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
TerminalSizeQueue: terminalSizeQueue,
})
}
// ExecOptions declare the arguments accepted by the Exec command
@ -102,6 +107,9 @@ type ExecOptions struct {
TTY bool
Command []string
// InterruptParent, if set, is used to handle interrupts while attached
InterruptParent *interrupt.Handler
In io.Reader
Out io.Writer
Err io.Writer
@ -186,58 +194,58 @@ func (p *ExecOptions) Run() error {
containerName = pod.Spec.Containers[0].Name
}
// TODO: refactor with terminal helpers from the edit utility once that is merged
var stdin io.Reader
tty := p.TTY
if p.Stdin {
stdin = p.In
if tty {
if file, ok := stdin.(*os.File); ok {
inFd := file.Fd()
if term.IsTerminal(inFd) {
oldState, err := term.SetRawTerminal(inFd)
if err != nil {
glog.Fatal(err)
}
// this handles a clean exit, where the command finished
defer term.RestoreTerminal(inFd, oldState)
// SIGINT is handled by term.SetRawTerminal (it runs a goroutine that listens
// for SIGINT and restores the terminal before exiting)
// this handles SIGTERM
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM)
go func() {
<-sigChan
term.RestoreTerminal(inFd, oldState)
os.Exit(0)
}()
} else {
fmt.Fprintln(p.Err, "STDIN is not a terminal")
}
} else {
tty = false
fmt.Fprintln(p.Err, "Unable to use a TTY - input is not the right kind of file")
}
}
// ensure we can recover the terminal while attached
t := term.TTY{
Parent: p.InterruptParent,
Out: p.Out,
}
// TODO: consider abstracting into a client invocation or client helper
req := p.Client.RESTClient.Post().
Resource("pods").
Name(pod.Name).
Namespace(pod.Namespace).
SubResource("exec").
Param("container", containerName)
req.VersionedParams(&api.PodExecOptions{
Container: containerName,
Command: p.Command,
Stdin: stdin != nil,
Stdout: p.Out != nil,
Stderr: p.Err != nil,
TTY: tty,
}, api.ParameterCodec)
// check for TTY
tty := p.TTY
if p.Stdin {
t.In = p.In
if tty && !t.IsTerminalIn() {
tty = false
fmt.Fprintln(p.Err, "Unable to use a TTY - input is not a terminal or the right kind of file")
}
} else {
p.In = nil
}
t.Raw = tty
return p.Executor.Execute("POST", req.URL(), p.Config, stdin, p.Out, p.Err, tty)
var sizeQueue term.TerminalSizeQueue
if tty {
// this call spawns a goroutine to monitor/update the terminal size
sizeQueue = t.MonitorSize(t.GetSize())
// unset p.Err if it was previously set because both stdout and stderr go over p.Out when tty is
// true
p.Err = nil
}
fn := func() error {
// TODO: consider abstracting into a client invocation or client helper
req := p.Client.RESTClient.Post().
Resource("pods").
Name(pod.Name).
Namespace(pod.Namespace).
SubResource("exec").
Param("container", containerName)
req.VersionedParams(&api.PodExecOptions{
Container: containerName,
Command: p.Command,
Stdin: p.Stdin,
Stdout: p.Out != nil,
Stderr: p.Err != nil,
TTY: tty,
}, api.ParameterCodec)
return p.Executor.Execute("POST", req.URL(), p.Config, p.In, p.Out, p.Err, tty, sizeQueue)
}
if err := t.Safe(fn); err != nil {
return err
}
return nil
}

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/fake"
"k8s.io/kubernetes/pkg/util/term"
)
type fakeRemoteExecutor struct {
@ -40,7 +41,7 @@ type fakeRemoteExecutor struct {
execErr error
}
func (f *fakeRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
func (f *fakeRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error {
f.method = method
f.url = url
return f.execErr

View File

@ -0,0 +1,46 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/term"
)
// handleResizing spawns a goroutine that processes the resize channel, calling resizeFunc for each
// term.Size received from the channel. The resize channel must be closed elsewhere to stop the
// goroutine.
func HandleResizing(resize <-chan term.Size, resizeFunc func(size term.Size)) {
if resize == nil {
return
}
go func() {
defer runtime.HandleCrash()
for {
size, ok := <-resize
if !ok {
return
}
if size.Height < 1 || size.Width < 1 {
continue
}
resizeFunc(size)
}
}()
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/term"
"k8s.io/kubernetes/pkg/volume"
)
@ -126,7 +127,7 @@ type Runtime interface {
}
type ContainerAttacher interface {
AttachContainer(id ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) (err error)
AttachContainer(id ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error)
}
// CommandRunner encapsulates the command runner interfaces for testability.
@ -134,7 +135,7 @@ type ContainerCommandRunner interface {
// Runs the command in the container of the specified pod using nsenter.
// Attaches the processes stdin, stdout, and stderr. Optionally uses a
// tty.
ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error
ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
// Forward the specified port from the specified pod to the stream.
PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error
}

View File

@ -27,6 +27,7 @@ import (
. "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/term"
"k8s.io/kubernetes/pkg/volume"
)
@ -273,7 +274,7 @@ func (f *FakeRuntime) GetPodStatus(uid types.UID, name, namespace string) (*PodS
return &status, f.Err
}
func (f *FakeRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
func (f *FakeRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
f.Lock()
defer f.Unlock()
@ -281,7 +282,7 @@ func (f *FakeRuntime) ExecInContainer(containerID ContainerID, cmd []string, std
return f.Err
}
func (f *FakeRuntime) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
func (f *FakeRuntime) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
f.Lock()
defer f.Unlock()

View File

@ -24,6 +24,7 @@ import (
. "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/term"
"k8s.io/kubernetes/pkg/volume"
)
@ -88,12 +89,12 @@ func (r *Mock) GetPodStatus(uid types.UID, name, namespace string) (*PodStatus,
return args.Get(0).(*PodStatus), args.Error(1)
}
func (r *Mock) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
func (r *Mock) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
args := r.Called(containerID, cmd, stdin, stdout, stderr, tty)
return args.Error(0)
}
func (r *Mock) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
func (r *Mock) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
args := r.Called(containerID, stdin, stdout, stderr, tty)
return args.Error(0)
}

View File

@ -77,6 +77,8 @@ type DockerInterface interface {
StartExec(string, dockertypes.ExecStartCheck, StreamOptions) error
InspectExec(id string) (*dockertypes.ContainerExecInspect, error)
AttachToContainer(string, dockertypes.ContainerAttachOptions, StreamOptions) error
ResizeContainerTTY(id string, height, width int) error
ResizeExecTTY(id string, height, width int) error
}
// KubeletContainerName encapsulates a pod name and a Kubernetes container name.

View File

@ -63,6 +63,7 @@ import (
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
utilstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/util/term"
)
const (
@ -1098,7 +1099,7 @@ func (d *dockerExitError) ExitStatus() int {
}
// ExecInContainer runs the command inside the container identified by containerID.
func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
if dm.execHandler == nil {
return errors.New("unable to exec without an exec handler")
}
@ -1111,10 +1112,16 @@ func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID,
return fmt.Errorf("container not running (%s)", container.ID)
}
return dm.execHandler.ExecInContainer(dm.client, container, cmd, stdin, stdout, stderr, tty)
return dm.execHandler.ExecInContainer(dm.client, container, cmd, stdin, stdout, stderr, tty, resize)
}
func (dm *DockerManager) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
func (dm *DockerManager) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
// Have to start this before the call to client.AttachToContainer because client.AttachToContainer is a blocking
// call :-( Otherwise, resize events don't get processed and the terminal never resizes.
kubecontainer.HandleResizing(resize, func(size term.Size) {
dm.client.ResizeContainerTTY(containerID.ID, int(size.Height), int(size.Width))
})
// TODO(random-liu): Do we really use the *Logs* field here?
opts := dockertypes.ContainerAttachOptions{
Stream: true,

View File

@ -26,18 +26,19 @@ import (
dockertypes "github.com/docker/engine-api/types"
"github.com/golang/glog"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/term"
)
// ExecHandler knows how to execute a command in a running Docker container.
type ExecHandler interface {
ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error
ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
}
// NsenterExecHandler executes commands in Docker containers using nsenter.
type NsenterExecHandler struct{}
// TODO should we support nsenter in a container, running with elevated privs and --pid=host?
func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
nsenter, err := exec.LookPath("nsenter")
if err != nil {
return fmt.Errorf("exec unavailable - unable to locate nsenter")
@ -61,6 +62,10 @@ func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *do
// make sure to close the stdout stream
defer stdout.Close()
kubecontainer.HandleResizing(resize, func(size term.Size) {
term.SetSize(p.Fd(), size)
})
if stdin != nil {
go io.Copy(p, stdin)
}
@ -98,7 +103,7 @@ func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *do
// NativeExecHandler executes commands in Docker containers using Docker's exec API.
type NativeExecHandler struct{}
func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
createOpts := dockertypes.ExecConfig{
Cmd: cmd,
AttachStdin: stdin != nil,
@ -110,6 +115,13 @@ func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *doc
if err != nil {
return fmt.Errorf("failed to exec in container - Exec setup failed - %v", err)
}
// Have to start this before the call to client.StartExec because client.StartExec is a blocking
// call :-( Otherwise, resize events don't get processed and the terminal never resizes.
kubecontainer.HandleResizing(resize, func(size term.Size) {
client.ResizeExecTTY(execObj.ID, int(size.Height), int(size.Width))
})
startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty}
streamOpts := StreamOptions{
InputStream: stdin,
@ -121,6 +133,7 @@ func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *doc
if err != nil {
return err
}
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
count := 0

View File

@ -500,6 +500,20 @@ func (f *FakeDockerClient) updateContainerStatus(id, status string) {
}
}
func (f *FakeDockerClient) ResizeExecTTY(id string, height, width int) error {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "resize_exec")
return nil
}
func (f *FakeDockerClient) ResizeContainerTTY(id string, height, width int) error {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "resize_container")
return nil
}
// FakeDockerPuller is a stub implementation of DockerPuller.
type FakeDockerPuller struct {
sync.Mutex

View File

@ -213,3 +213,21 @@ func (in instrumentedDockerInterface) ImageHistory(id string) ([]dockertypes.Ima
recordError(operation, err)
return out, err
}
func (in instrumentedDockerInterface) ResizeExecTTY(id string, height, width int) error {
const operation = "resize_exec"
defer recordOperation(operation, time.Now())
err := in.client.ResizeExecTTY(id, height, width)
recordError(operation, err)
return err
}
func (in instrumentedDockerInterface) ResizeContainerTTY(id string, height, width int) error {
const operation = "resize_container"
defer recordOperation(operation, time.Now())
err := in.client.ResizeContainerTTY(id, height, width)
recordError(operation, err)
return err
}

View File

@ -454,6 +454,24 @@ func (d *kubeDockerClient) AttachToContainer(id string, opts dockertypes.Contain
return d.holdHijackedConnection(sopts.RawTerminal, sopts.InputStream, sopts.OutputStream, sopts.ErrorStream, resp)
}
func (d *kubeDockerClient) ResizeExecTTY(id string, height, width int) error {
ctx, cancel := d.getCancelableContext()
defer cancel()
return d.client.ContainerExecResize(ctx, id, dockertypes.ResizeOptions{
Height: height,
Width: width,
})
}
func (d *kubeDockerClient) ResizeContainerTTY(id string, height, width int) error {
ctx, cancel := d.getCancelableContext()
defer cancel()
return d.client.ContainerResize(ctx, id, dockertypes.ResizeOptions{
Height: height,
Width: width,
})
}
// redirectResponseToOutputStream redirect the response stream to stdout and stderr. When tty is true, all stream will
// only be redirected to stdout.
func (d *kubeDockerClient) redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, resp io.Reader) error {

View File

@ -88,6 +88,7 @@ import (
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/selinux"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/term"
utilvalidation "k8s.io/kubernetes/pkg/util/validation"
"k8s.io/kubernetes/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/util/wait"
@ -3777,7 +3778,7 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe
var buffer bytes.Buffer
output := ioutils.WriteCloserWrapper(&buffer)
err = kl.runner.ExecInContainer(container.ID, cmd, nil, output, output, false)
err = kl.runner.ExecInContainer(container.ID, cmd, nil, output, output, false, nil)
if err != nil {
return nil, err
}
@ -3787,7 +3788,7 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
podUID = kl.podManager.TranslatePodUID(podUID)
container, err := kl.findContainer(podFullName, podUID, containerName)
@ -3797,12 +3798,12 @@ func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, contain
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return kl.runner.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty)
return kl.runner.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty, resize)
}
// AttachContainer uses the container runtime to attach the given streams to
// the given container.
func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
podUID = kl.podManager.TranslatePodUID(podUID)
container, err := kl.findContainer(podFullName, podUID, containerName)
@ -3812,7 +3813,7 @@ func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, contain
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return kl.containerRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty)
return kl.containerRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty, resize)
}
// PortForward connects to the pod's port and copies data between the port

View File

@ -74,6 +74,7 @@ import (
"k8s.io/kubernetes/pkg/util/rand"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/term"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
@ -1079,7 +1080,7 @@ type fakeContainerCommandRunner struct {
Stream io.ReadWriteCloser
}
func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
f.Cmd = cmd
f.ID = id
f.Stdin = in
@ -2096,6 +2097,7 @@ func TestExecInContainerNoSuchPod(t *testing.T) {
nil,
nil,
false,
nil,
)
if err == nil {
t.Fatal("unexpected non-error")
@ -2140,6 +2142,7 @@ func TestExecInContainerNoSuchContainer(t *testing.T) {
nil,
nil,
false,
nil,
)
if err == nil {
t.Fatal("unexpected non-error")
@ -2200,6 +2203,7 @@ func TestExecInContainer(t *testing.T) {
stdout,
stderr,
tty,
nil,
)
if err != nil {
t.Fatalf("unexpected error: %s", err)

View File

@ -60,7 +60,7 @@ func (hr *HandlerRunner) Run(containerID kubecontainer.ContainerID, pod *api.Pod
msg string
)
output := ioutils.WriteCloserWrapper(&buffer)
err := hr.commandRunner.ExecInContainer(containerID, handler.Exec.Command, nil, output, output, false)
err := hr.commandRunner.ExecInContainer(containerID, handler.Exec.Command, nil, output, output, false, nil)
if err != nil {
msg := fmt.Sprintf("Exec lifecycle hook (%v) for Container %q in Pod %q failed - %q", handler.Exec.Command, container.Name, format.Pod(pod), buffer.String())
glog.V(1).Infof(msg)

View File

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/term"
)
func TestResolvePortInt(t *testing.T) {
@ -80,7 +81,7 @@ type fakeContainerCommandRunner struct {
ID kubecontainer.ContainerID
}
func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
f.Cmd = cmd
f.ID = id
return nil

View File

@ -228,7 +228,7 @@ func (p *prober) newExecInContainer(container api.Container, containerID kubecon
return execInContainer{func() ([]byte, error) {
var buffer bytes.Buffer
output := ioutils.WriteCloserWrapper(&buffer)
err := p.runner.ExecInContainer(containerID, cmd, nil, output, output, false)
err := p.runner.ExecInContainer(containerID, cmd, nil, output, output, false, nil)
if err != nil {
return nil, err
}

View File

@ -60,6 +60,7 @@ import (
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/selinux"
utilstrings "k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/util/term"
utilwait "k8s.io/kubernetes/pkg/util/wait"
)
@ -2007,14 +2008,14 @@ func newRktExitError(e error) error {
return e
}
func (r *Runtime) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
func (r *Runtime) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
return fmt.Errorf("unimplemented")
}
// Note: In rkt, the container ID is in the form of "UUID:appName", where UUID is
// the rkt UUID, and appName is the container name.
// TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail.
func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
glog.V(4).Infof("Rkt execing in container.")
id, err := parseContainerID(containerID)
@ -2035,6 +2036,10 @@ func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []s
// make sure to close the stdout stream
defer stdout.Close()
kubecontainer.HandleResizing(resize, func(size term.Size) {
term.SetSize(p.Fd(), size)
})
if stdin != nil {
go io.Copy(p, stdin)
}

View File

@ -25,13 +25,14 @@ import (
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/term"
)
// Attacher knows how to attach to a running container in a pod.
type Attacher interface {
// AttachContainer attaches to the running container in the pod, copying data between in/out/err
// and the container's stdin/stdout/stderr.
AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error
AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error
}
// ServeAttach handles requests to attach to a container. After creating/receiving the required
@ -44,7 +45,7 @@ func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, po
}
defer ctx.conn.Close()
err := attacher.AttachContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty)
err := attacher.AttachContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan)
if err != nil {
msg := fmt.Sprintf("error attaching to container: %v", err)
runtime.HandleError(errors.New(msg))

View File

@ -31,6 +31,11 @@ const (
// attachment/execution. It is the second version of the subprotocol and
// resolves the issues present in the first version.
StreamProtocolV2Name = "v2.channel.k8s.io"
// The SPDY subprotocol "v3.channel.k8s.io" is used for remote command
// attachment/execution. It is the third version of the subprotocol and
// adds support for resizing container terminals.
StreamProtocolV3Name = "v3.channel.k8s.io"
)
var SupportedStreamingProtocols = []string{StreamProtocolV2Name, StreamProtocolV1Name}
var SupportedStreamingProtocols = []string{StreamProtocolV3Name, StreamProtocolV2Name, StreamProtocolV1Name}

View File

@ -26,13 +26,14 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/term"
)
// Executor knows how to execute a command in a container in a pod.
type Executor interface {
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error
}
// ServeExec handles requests to execute a command in a container. After
@ -48,7 +49,7 @@ func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podN
cmd := req.URL.Query()[api.ExecCommandParamm]
err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty)
err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan)
if err != nil {
msg := fmt.Sprintf("error executing command in container: %v", err)
runtime.HandleError(errors.New(msg))

View File

@ -17,6 +17,7 @@ limitations under the License.
package remotecommand
import (
"encoding/json"
"errors"
"fmt"
"io"
@ -27,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/httpstream/spdy"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/term"
"k8s.io/kubernetes/pkg/util/wsstream"
"github.com/golang/glog"
@ -87,6 +89,8 @@ type context struct {
stdoutStream io.WriteCloser
stderrStream io.WriteCloser
errorStream io.WriteCloser
resizeStream io.ReadCloser
resizeChan chan term.Size
tty bool
}
@ -118,10 +122,26 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt
return nil, false
}
var ctx *context
var ok bool
if wsstream.IsWebSocketRequest(req) {
return createWebSocketStreams(req, w, opts, idleTimeout)
ctx, ok = createWebSocketStreams(req, w, opts, idleTimeout)
} else {
ctx, ok = createHttpStreamStreams(req, w, opts, supportedStreamProtocols, idleTimeout, streamCreationTimeout)
}
if !ok {
return nil, false
}
if ctx.resizeStream != nil {
ctx.resizeChan = make(chan term.Size)
go handleResizeEvents(ctx.resizeStream, ctx.resizeChan)
}
return ctx, true
}
func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
@ -148,6 +168,8 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt
var handler protocolHandler
switch protocol {
case StreamProtocolV3Name:
handler = &v3ProtocolHandler{}
case StreamProtocolV2Name:
handler = &v2ProtocolHandler{}
case "":
@ -157,6 +179,10 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt
handler = &v1ProtocolHandler{}
}
if opts.tty && handler.supportsTerminalResizing() {
opts.expectedStreams++
}
expired := time.NewTimer(streamCreationTimeout)
ctx, err := handler.waitForStreams(streamCh, opts.expectedStreams, expired.C)
@ -167,6 +193,7 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt
ctx.conn = conn
ctx.tty = opts.tty
return ctx, true
}
@ -174,8 +201,61 @@ type protocolHandler interface {
// waitForStreams waits for the expected streams or a timeout, returning a
// remoteCommandContext if all the streams were received, or an error if not.
waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error)
// supportsTerminalResizing returns true if the protocol handler supports terminal resizing
supportsTerminalResizing() bool
}
// v3ProtocolHandler implements the V3 protocol version for streaming command execution.
type v3ProtocolHandler struct{}
func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
ctx := &context{}
receivedStreams := 0
replyChan := make(chan struct{})
stop := make(chan struct{})
defer close(stop)
WaitForStreams:
for {
select {
case stream := <-streams:
streamType := stream.Headers().Get(api.StreamType)
switch streamType {
case api.StreamTypeError:
ctx.errorStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdin:
ctx.stdinStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdout:
ctx.stdoutStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStderr:
ctx.stderrStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeResize:
ctx.resizeStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
default:
runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
}
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break WaitForStreams
}
case <-expired:
// TODO find a way to return the error to the user. Maybe use a separate
// stream to report errors?
return nil, errors.New("timed out waiting for client to create streams")
}
}
return ctx, nil
}
// supportsTerminalResizing returns true because v3ProtocolHandler supports it
func (*v3ProtocolHandler) supportsTerminalResizing() bool { return true }
// v2ProtocolHandler implements the V2 protocol version for streaming command execution.
type v2ProtocolHandler struct{}
@ -221,6 +301,9 @@ WaitForStreams:
return ctx, nil
}
// supportsTerminalResizing returns false because v2ProtocolHandler doesn't support it.
func (*v2ProtocolHandler) supportsTerminalResizing() bool { return false }
// v1ProtocolHandler implements the V1 protocol version for streaming command execution.
type v1ProtocolHandler struct{}
@ -275,3 +358,19 @@ WaitForStreams:
return ctx, nil
}
// supportsTerminalResizing returns false because v1ProtocolHandler doesn't support it.
func (*v1ProtocolHandler) supportsTerminalResizing() bool { return false }
func handleResizeEvents(stream io.Reader, channel chan<- term.Size) {
defer runtime.HandleCrash()
decoder := json.NewDecoder(stream)
for {
size := term.Size{}
if err := decoder.Decode(&size); err != nil {
break
}
channel <- size
}
}

View File

@ -17,61 +17,82 @@ limitations under the License.
package remotecommand
import (
"fmt"
"net/http"
"time"
"k8s.io/kubernetes/pkg/httplog"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wsstream"
"github.com/golang/glog"
)
// standardShellChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2)
// along with the approximate duplex value. Supported subprotocols are "channel.k8s.io" and
// "base64.channel.k8s.io".
func standardShellChannels(stdin, stdout, stderr bool) []wsstream.ChannelType {
// open three half-duplex channels
channels := []wsstream.ChannelType{wsstream.ReadChannel, wsstream.WriteChannel, wsstream.WriteChannel}
if !stdin {
channels[0] = wsstream.IgnoreChannel
}
if !stdout {
channels[1] = wsstream.IgnoreChannel
}
if !stderr {
channels[2] = wsstream.IgnoreChannel
}
const (
stdinChannel = iota
stdoutChannel
stderrChannel
errorChannel
resizeChannel
)
// createChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2)
// along with the approximate duplex value. It also creates the error (3) and resize (4) channels.
func createChannels(opts *options) []wsstream.ChannelType {
// open the requested channels, and always open the error channel
channels := make([]wsstream.ChannelType, 5)
channels[stdinChannel] = readChannel(opts.stdin)
channels[stdoutChannel] = writeChannel(opts.stdout)
channels[stderrChannel] = writeChannel(opts.stderr)
channels[errorChannel] = wsstream.WriteChannel
channels[resizeChannel] = wsstream.ReadChannel
return channels
}
// createWebSocketStreams returns a remoteCommandContext containing the websocket connection and
// readChannel returns wsstream.ReadChannel if real is true, or wsstream.IgnoreChannel.
func readChannel(real bool) wsstream.ChannelType {
if real {
return wsstream.ReadChannel
}
return wsstream.IgnoreChannel
}
// writeChannel returns wsstream.WriteChannel if real is true, or wsstream.IgnoreChannel.
func writeChannel(real bool) wsstream.ChannelType {
if real {
return wsstream.WriteChannel
}
return wsstream.IgnoreChannel
}
// createWebSocketStreams returns a context containing the websocket connection and
// streams needed to perform an exec or an attach.
func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *options, idleTimeout time.Duration) (*context, bool) {
// open the requested channels, and always open the error channel
channels := append(standardShellChannels(opts.stdin, opts.stdout, opts.stderr), wsstream.WriteChannel)
channels := createChannels(opts)
conn := wsstream.NewConn(channels...)
conn.SetIdleTimeout(idleTimeout)
streams, err := conn.Open(httplog.Unlogged(w), req)
if err != nil {
glog.Errorf("Unable to upgrade websocket connection: %v", err)
runtime.HandleError(fmt.Errorf("Unable to upgrade websocket connection: %v", err))
return nil, false
}
// Send an empty message to the lowest writable channel to notify the client the connection is established
// TODO: make generic to SPDY and WebSockets and do it outside of this method?
switch {
case opts.stdout:
streams[1].Write([]byte{})
streams[stdoutChannel].Write([]byte{})
case opts.stderr:
streams[2].Write([]byte{})
streams[stderrChannel].Write([]byte{})
default:
streams[3].Write([]byte{})
streams[errorChannel].Write([]byte{})
}
return &context{
conn: conn,
stdinStream: streams[0],
stdoutStream: streams[1],
stderrStream: streams[2],
errorStream: streams[3],
stdinStream: streams[stdinChannel],
stdoutStream: streams[stdoutChannel],
stderrStream: streams[stderrChannel],
errorStream: streams[errorChannel],
tty: opts.tty,
resizeStream: streams[resizeChannel],
}, true
}

View File

@ -58,6 +58,7 @@ import (
"k8s.io/kubernetes/pkg/util/httpstream/spdy"
"k8s.io/kubernetes/pkg/util/limitwriter"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/term"
"k8s.io/kubernetes/pkg/volume"
)
@ -160,8 +161,8 @@ type HostInterface interface {
GetRunningPods() ([]*api.Pod, error)
GetPodByName(namespace, name string) (*api.Pod, bool)
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error
AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error
GetKubeletContainerLogs(podFullName, containerName string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error
ServeLogs(w http.ResponseWriter, req *http.Request)
PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error

View File

@ -48,6 +48,7 @@ import (
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/httpstream/spdy"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/term"
"k8s.io/kubernetes/pkg/volume"
)
@ -119,11 +120,11 @@ func (fk *fakeKubelet) RunInContainer(podFullName string, uid types.UID, contain
return fk.runFunc(podFullName, uid, containerName, cmd)
}
func (fk *fakeKubelet) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
func (fk *fakeKubelet) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
return fk.execFunc(name, uid, container, cmd, in, out, err, tty)
}
func (fk *fakeKubelet) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error {
func (fk *fakeKubelet) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
return fk.attachFunc(name, uid, container, in, out, err, tty)
}

147
pkg/util/term/resize.go Normal file
View File

@ -0,0 +1,147 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package term
import (
"fmt"
"github.com/docker/docker/pkg/term"
"k8s.io/kubernetes/pkg/util/runtime"
)
// Size represents the width and height of a terminal.
type Size struct {
Width uint16
Height uint16
}
// GetSize returns the current size of the user's terminal. If it isn't a terminal,
// nil is returned.
func (t TTY) GetSize() *Size {
if !t.IsTerminalOut() {
return nil
}
return GetSize(t.Out.(fd).Fd())
}
// GetSize returns the current size of the terminal associated with fd.
func GetSize(fd uintptr) *Size {
winsize, err := term.GetWinsize(fd)
if err != nil {
runtime.HandleError(fmt.Errorf("unable to get terminal size: %v", err))
return nil
}
return &Size{Width: winsize.Width, Height: winsize.Height}
}
// SetSize sets the terminal size associated with fd.
func SetSize(fd uintptr, size Size) error {
return term.SetWinsize(fd, &term.Winsize{Height: size.Height, Width: size.Width})
}
// MonitorSize monitors the terminal's size. It returns a TerminalSizeQueue primed with
// initialSizes, or nil if there's no TTY present.
func (t *TTY) MonitorSize(initialSizes ...*Size) TerminalSizeQueue {
if !t.IsTerminalOut() {
return nil
}
t.sizeQueue = &sizeQueue{
t: *t,
// make it buffered so we can send the initial terminal sizes without blocking, prior to starting
// the streaming below
resizeChan: make(chan Size, len(initialSizes)),
stopResizing: make(chan struct{}),
}
t.sizeQueue.monitorSize(initialSizes...)
return t.sizeQueue
}
// TerminalSizeQueue is capable of returning terminal resize events as they occur.
type TerminalSizeQueue interface {
// Next returns the new terminal size after the terminal has been resized. It returns nil when
// monitoring has been stopped.
Next() *Size
}
// sizeQueue implements TerminalSizeQueue
type sizeQueue struct {
t TTY
// resizeChan receives a Size each time the user's terminal is resized.
resizeChan chan Size
stopResizing chan struct{}
}
// make sure sizeQueue implements the TerminalSizeQueue interface
var _ TerminalSizeQueue = &sizeQueue{}
// monitorSize primes resizeChan with initialSizes and then monitors for resize events. With each
// new event, it sends the current terminal size to resizeChan.
func (s *sizeQueue) monitorSize(initialSizes ...*Size) {
// send the initial sizes
for i := range initialSizes {
if initialSizes[i] != nil {
s.resizeChan <- *initialSizes[i]
}
}
resizeEvents := make(chan Size, 1)
monitorResizeEvents(s.t.Out.(fd).Fd(), resizeEvents, s.stopResizing)
// listen for resize events in the background
go func() {
defer runtime.HandleCrash()
for {
select {
case size, ok := <-resizeEvents:
if !ok {
return
}
select {
// try to send the size to resizeChan, but don't block
case s.resizeChan <- size:
// send successful
default:
// unable to send / no-op
}
case <-s.stopResizing:
return
}
}
}()
}
// Next returns the new terminal size after the terminal has been resized. It returns nil when
// monitoring has been stopped.
func (s *sizeQueue) Next() *Size {
size, ok := <-s.resizeChan
if !ok {
return nil
}
return &size
}
// stop stops the background goroutine that is monitoring for terminal resizes.
func (s *sizeQueue) stop() {
close(s.stopResizing)
}

View File

@ -0,0 +1,60 @@
// +build !windows
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package term
import (
"os"
"os/signal"
"syscall"
"k8s.io/kubernetes/pkg/util/runtime"
)
// monitorResizeEvents spawns a goroutine that waits for SIGWINCH signals (these indicate the
// terminal has resized). After receiving a SIGWINCH, this gets the terminal size and tries to send
// it to the resizeEvents channel. The goroutine stops when the stop channel is closed.
func monitorResizeEvents(fd uintptr, resizeEvents chan<- Size, stop chan struct{}) {
go func() {
defer runtime.HandleCrash()
winch := make(chan os.Signal, 1)
signal.Notify(winch, syscall.SIGWINCH)
defer signal.Stop(winch)
for {
select {
case <-winch:
size := GetSize(fd)
if size == nil {
return
}
// try to send size
select {
case resizeEvents <- *size:
// success
default:
// not sent
}
case <-stop:
return
}
}
}()
}

View File

@ -0,0 +1,57 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package term
import (
"time"
"k8s.io/kubernetes/pkg/util/runtime"
)
// monitorResizeEvents spawns a goroutine that periodically gets the terminal size and tries to send
// it to the resizeEvents channel if the size has changed. The goroutine stops when the stop channel
// is closed.
func monitorResizeEvents(fd uintptr, resizeEvents chan<- Size, stop chan struct{}) {
go func() {
defer runtime.HandleCrash()
var lastSize Size
for {
// see if we need to stop running
select {
case <-stop:
return
default:
}
size := GetSize(fd)
if size == nil {
return
}
if size.Height != lastSize.Height || size.Width != lastSize.Width {
lastSize.Height = size.Height
lastSize.Width = size.Width
resizeEvents <- *size
}
// sleep to avoid hot looping
time.Sleep(250 * time.Millisecond)
}
}()
}

View File

@ -21,17 +21,22 @@ import (
"os"
"github.com/docker/docker/pkg/term"
"k8s.io/kubernetes/pkg/util/interrupt"
)
// SafeFunc is a function to be invoked by TTY.
type SafeFunc func() error
// TTY helps invoke a function and preserve the state of the terminal, even if the
// process is terminated during execution.
// TTY helps invoke a function and preserve the state of the terminal, even if the process is
// terminated during execution. It also provides support for terminal resizing for remote command
// execution/attachment.
type TTY struct {
// In is a reader to check for a terminal.
// In is a reader representing stdin. It is a required field.
In io.Reader
// Out is a writer representing stdout. It must be set to support terminal resizing. It is an
// optional field.
Out io.Writer
// Raw is true if the terminal should be set raw.
Raw bool
// TryDev indicates the TTY should try to open /dev/tty if the provided input
@ -41,6 +46,10 @@ type TTY struct {
// it will be invoked after the terminal state is restored. If it is not provided,
// a signal received during the TTY will result in os.Exit(0) being invoked.
Parent *interrupt.Handler
// sizeQueue is set after a call to MonitorSize() and is used to monitor SIGWINCH signals when the
// user's terminal resizes.
sizeQueue *sizeQueue
}
// fd returns a file descriptor for a given object.
@ -48,10 +57,22 @@ type fd interface {
Fd() uintptr
}
// IsTerminal returns true if the provided input is a terminal. Does not check /dev/tty
// IsTerminalIn returns true if t.In is a terminal. Does not check /dev/tty
// even if TryDev is set.
func (t TTY) IsTerminal() bool {
return IsTerminal(t.In)
func (t TTY) IsTerminalIn() bool {
return isTerminal(t.In)
}
// IsTerminalOut returns true if t.Out is a terminal. Does not check /dev/tty
// even if TryDev is set.
func (t TTY) IsTerminalOut() bool {
return isTerminal(t.Out)
}
// isTerminal returns whether the passed object is a terminal or not
func isTerminal(i interface{}) bool {
file, ok := i.(fd)
return ok && term.IsTerminal(file.Fd())
}
// Safe invokes the provided function and will attempt to ensure that when the
@ -90,11 +111,11 @@ func (t TTY) Safe(fn SafeFunc) error {
if err != nil {
return err
}
return interrupt.Chain(t.Parent, func() { term.RestoreTerminal(inFd, state) }).Run(fn)
}
return interrupt.Chain(t.Parent, func() {
if t.sizeQueue != nil {
t.sizeQueue.stop()
}
// IsTerminal returns whether the passed io.Reader is a terminal or not
func IsTerminal(r io.Reader) bool {
file, ok := r.(fd)
return ok && term.IsTerminal(file.Fd())
term.RestoreTerminal(inFd, state)
}).Run(fn)
}

View File

@ -34,7 +34,13 @@ func execute(method string, url *url.URL, config *restclient.Config, stdin io.Re
if err != nil {
return err
}
return exec.Stream(remotecommandserver.SupportedStreamingProtocols, stdin, stdout, stderr, tty)
return exec.Stream(remotecommand.StreamOptions{
SupportedProtocols: remotecommandserver.SupportedStreamingProtocols,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
})
}
func execCommandInContainer(config *restclient.Config, c *client.Client, ns, podName, containerName string, cmd []string) (string, error) {