Separate Direct and Indirect streaming paths, implement indirect path with CRI

pull/6/head
Tim St. Clair 2016-11-02 17:42:00 -07:00
parent c83a924628
commit 86d849e374
No known key found for this signature in database
GPG Key ID: 434D16BCEF479EAB
28 changed files with 723 additions and 436 deletions

View File

@ -76,12 +76,12 @@ go_library(
"//pkg/kubelet/remote:go_default_library",
"//pkg/kubelet/rkt:go_default_library",
"//pkg/kubelet/server:go_default_library",
"//pkg/kubelet/server/remotecommand:go_default_library",
"//pkg/kubelet/server/stats:go_default_library",
"//pkg/kubelet/status:go_default_library",
"//pkg/kubelet/sysctl:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/kubelet/util/ioutils:go_default_library",
"//pkg/kubelet/util/queue:go_default_library",
"//pkg/kubelet/util/sliceutils:go_default_library",
"//pkg/kubelet/volumemanager:go_default_library",
@ -173,6 +173,7 @@ go_test(
"//pkg/kubelet/pod/testing:go_default_library",
"//pkg/kubelet/prober/results:go_default_library",
"//pkg/kubelet/prober/testing:go_default_library",
"//pkg/kubelet/server/remotecommand:go_default_library",
"//pkg/kubelet/server/stats:go_default_library",
"//pkg/kubelet/status:go_default_library",
"//pkg/kubelet/types:go_default_library",
@ -190,7 +191,6 @@ go_test(
"//pkg/util/rand:go_default_library",
"//pkg/util/runtime:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/term:go_default_library",
"//pkg/util/testing:go_default_library",
"//pkg/util/uuid:go_default_library",
"//pkg/util/wait:go_default_library",

View File

@ -33,6 +33,7 @@ go_library(
"//pkg/client/record:go_default_library",
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/kubelet/util/ioutils:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/types:go_default_library",
"//pkg/util/errors:go_default_library",

View File

@ -17,6 +17,7 @@ limitations under the License.
package container
import (
"bytes"
"fmt"
"hash/adler32"
"strings"
@ -28,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
hashutil "k8s.io/kubernetes/pkg/util/hash"
@ -223,3 +225,23 @@ func FormatPod(pod *Pod) string {
// (DNS subdomain format), while allowed in the container name format.
return fmt.Sprintf("%s_%s(%s)", pod.Name, pod.Namespace, pod.ID)
}
type containerCommandRunnerWrapper struct {
DirectStreamingRuntime
}
var _ ContainerCommandRunner = &containerCommandRunnerWrapper{}
func DirectStreamingRunner(runtime DirectStreamingRuntime) ContainerCommandRunner {
return &containerCommandRunnerWrapper{runtime}
}
func (r *containerCommandRunnerWrapper) RunInContainer(id ContainerID, cmd []string) ([]byte, error) {
var buffer bytes.Buffer
output := ioutils.WriteCloserWrapper(&buffer)
err := r.ExecInContainer(id, cmd, nil, output, output, false, nil)
// Even if err is non-nil, there still may be output (e.g. the exec wrote to stdout or stderr but
// the command returned a nonzero exit code). Therefore, always return the output along with the
// error.
return buffer.Bytes(), err
}

View File

@ -19,6 +19,7 @@ package container
import (
"fmt"
"io"
"net/url"
"reflect"
"strings"
"time"
@ -112,10 +113,6 @@ type Runtime interface {
GetContainerLogs(pod *api.Pod, containerID ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error)
// Delete a container. If the container is still running, an error is returned.
DeleteContainer(containerID ContainerID) error
// ContainerCommandRunner encapsulates the command runner interfaces for testability.
ContainerCommandRunner
// ContainerAttach encapsulates the attaching to containers for testability
ContainerAttacher
// ImageService provides methods to image-related methods.
ImageService
// UpdatePodCIDR sends a new podCIDR to the runtime.
@ -124,6 +121,28 @@ type Runtime interface {
UpdatePodCIDR(podCIDR string) error
}
// DirectStreamingRuntime is the interface implemented by runtimes for which the streaming calls
// (exec/attach/port-forward) should be served directly by the Kubelet.
type DirectStreamingRuntime interface {
// Runs the command in the container of the specified pod using nsenter.
// Attaches the processes stdin, stdout, and stderr. Optionally uses a
// tty.
ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
// Forward the specified port from the specified pod to the stream.
PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error
// ContainerAttach encapsulates the attaching to containers for testability
ContainerAttacher
}
// IndirectStreamingRuntime is the interface implemented by runtimes that handle the serving of the
// streaming calls (exec/attach/port-forward) themselves. In this case, Kubelet should redirect to
// the runtime server.
type IndirectStreamingRuntime interface {
GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error)
GetAttach(id ContainerID, stdin, stdout, stderr bool) (*url.URL, error)
GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error)
}
type ImageService interface {
// PullImage pulls an image from the network to local storage using the supplied
// secrets if necessary.
@ -142,14 +161,10 @@ type ContainerAttacher interface {
AttachContainer(id ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error)
}
// CommandRunner encapsulates the command runner interfaces for testability.
type ContainerCommandRunner interface {
// Runs the command in the container of the specified pod using nsenter.
// Attaches the processes stdin, stdout, and stderr. Optionally uses a
// tty.
ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
// Forward the specified port from the specified pod to the stream.
PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error
// RunInContainer synchronously executes the command in the container, and returns the output.
// If the command completes with a non-0 exit code, a pkg/util/exec.ExitError will be returned.
RunInContainer(id ContainerID, cmd []string) ([]byte, error)
}
// Pod is a group of containers.

View File

@ -19,6 +19,7 @@ package testing
import (
"fmt"
"io"
"net/url"
"reflect"
"sync"
"time"
@ -57,6 +58,31 @@ type FakeRuntime struct {
StatusErr error
}
type FakeDirectStreamingRuntime struct {
*FakeRuntime
// Arguments to streaming method calls.
Args struct {
// Attach / Exec args
ContainerID ContainerID
Cmd []string
Stdin io.Reader
Stdout io.WriteCloser
Stderr io.WriteCloser
TTY bool
// Port-forward args
Pod *Pod
Port uint16
Stream io.ReadWriteCloser
}
}
const FakeHost = "localhost:12345"
type FakeIndirectStreamingRuntime struct {
*FakeRuntime
}
// FakeRuntime should implement Runtime.
var _ Runtime = &FakeRuntime{}
@ -279,19 +305,32 @@ func (f *FakeRuntime) GetPodStatus(uid types.UID, name, namespace string) (*PodS
return &status, f.Err
}
func (f *FakeRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
func (f *FakeDirectStreamingRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "ExecInContainer")
f.Args.ContainerID = containerID
f.Args.Cmd = cmd
f.Args.Stdin = stdin
f.Args.Stdout = stdout
f.Args.Stderr = stderr
f.Args.TTY = tty
return f.Err
}
func (f *FakeRuntime) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
func (f *FakeDirectStreamingRuntime) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "AttachContainer")
f.Args.ContainerID = containerID
f.Args.Stdin = stdin
f.Args.Stdout = stdout
f.Args.Stderr = stderr
f.Args.TTY = tty
return f.Err
}
@ -349,11 +388,15 @@ func (f *FakeRuntime) RemoveImage(image ImageSpec) error {
return f.Err
}
func (f *FakeRuntime) PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error {
func (f *FakeDirectStreamingRuntime) PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "PortForward")
f.Args.Pod = pod
f.Args.Port = port
f.Args.Stream = stream
return f.Err
}
@ -405,3 +448,47 @@ func (f *FakeRuntime) ImageStats() (*ImageStats, error) {
f.CalledFunctions = append(f.CalledFunctions, "ImageStats")
return nil, f.Err
}
func (f *FakeIndirectStreamingRuntime) GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "GetExec")
return &url.URL{Host: FakeHost}, f.Err
}
func (f *FakeIndirectStreamingRuntime) GetAttach(id ContainerID, stdin, stdout, stderr bool) (*url.URL, error) {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "GetAttach")
return &url.URL{Host: FakeHost}, f.Err
}
func (f *FakeIndirectStreamingRuntime) GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "GetPortForward")
return &url.URL{Host: FakeHost}, f.Err
}
type FakeContainerCommandRunner struct {
// what to return
Stdout string
Err error
// actual values when invoked
ContainerID ContainerID
Cmd []string
}
var _ ContainerCommandRunner = &FakeContainerCommandRunner{}
func (f *FakeContainerCommandRunner) RunInContainer(containerID ContainerID, cmd []string) ([]byte, error) {
// record invoked values
f.ContainerID = containerID
f.Cmd = cmd
return []byte(f.Stdout), f.Err
}

View File

@ -107,8 +107,9 @@ const (
)
var (
// DockerManager implements the Runtime interface.
// DockerManager implements the Runtime and DirectStreamingRuntime interfaces.
_ kubecontainer.Runtime = &DockerManager{}
_ kubecontainer.DirectStreamingRuntime = &DockerManager{}
// TODO: make this a TTL based pull (if image older than X policy, pull)
podInfraContainerImagePullPolicy = api.PullIfNotPresent
@ -281,7 +282,8 @@ func NewDockerManager(
imageStatsProvider: newImageStatsProvider(client),
seccompProfileRoot: seccompProfileRoot,
}
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
cmdRunner := kubecontainer.DirectStreamingRunner(dm)
dm.runner = lifecycle.NewHandlerRunner(httpClient, cmdRunner, dm)
dm.imagePuller = images.NewImageManager(kubecontainer.FilterEventRecorder(recorder), dm, imageBackOff, serializeImagePulls, qps, burst)
dm.containerGC = NewContainerGC(client, podGetter, containerLogsDir)

View File

@ -551,7 +551,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
// becomes the default.
klet.networkPlugin = nil
klet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager(
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
containerRefManager,
@ -582,9 +582,11 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
if err != nil {
return nil, err
}
klet.containerRuntime = runtime
klet.runner = runtime
default:
// Only supported one for now, continue.
klet.containerRuntime = dockertools.NewDockerManager(
runtime := dockertools.NewDockerManager(
kubeDeps.DockerClient,
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
@ -616,6 +618,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
kubeCfg.SeccompProfileRoot,
kubeDeps.ContainerRuntimeOptions...,
)
klet.containerRuntime = runtime
klet.runner = kubecontainer.DirectStreamingRunner(runtime)
}
case "rkt":
// TODO: Include hairpin mode settings in rkt?
@ -647,6 +651,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
return nil, err
}
klet.containerRuntime = rktRuntime
klet.runner = kubecontainer.DirectStreamingRunner(rktRuntime)
case "remote":
remoteRuntimeService, err := remote.NewRemoteRuntimeService(kubeCfg.RemoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
if err != nil {
@ -656,7 +661,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
if err != nil {
return nil, err
}
klet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager(
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
containerRefManager,
@ -677,6 +682,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
if err != nil {
return nil, err
}
klet.containerRuntime = runtime
klet.runner = runtime
default:
return nil, fmt.Errorf("unsupported container runtime %q specified", kubeCfg.ContainerRuntime)
}
@ -703,7 +710,6 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
}
klet.imageManager = imageManager
klet.runner = klet.containerRuntime
klet.statusManager = status.NewManager(kubeClient, klet.podManager)
klet.probeManager = prober.NewManager(

View File

@ -22,6 +22,7 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
@ -38,10 +39,10 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/sets"
@ -1186,14 +1187,106 @@ func (kl *Kubelet) findContainer(podFullName string, podUID types.UID, container
if err != nil {
return nil, err
}
podUID = kl.podManager.TranslatePodUID(podUID)
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
return pod.FindContainerByName(containerName), nil
}
// Run a command in a container, returns the combined stdout, stderr as an array of bytes
func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containerName string, cmd []string) ([]byte, error) {
podUID = kl.podManager.TranslatePodUID(podUID)
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return nil, err
}
if container == nil {
return nil, fmt.Errorf("container not found (%q)", containerName)
}
return kl.runner.RunInContainer(container.ID, cmd)
}
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime)
if !ok {
return fmt.Errorf("streaming methods not supported by runtime")
}
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return streamingRuntime.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty, resize)
}
// AttachContainer uses the container runtime to attach the given streams to
// the given container.
func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime)
if !ok {
return fmt.Errorf("streaming methods not supported by runtime")
}
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return streamingRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty, resize)
}
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16, stream io.ReadWriteCloser) error {
streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime)
if !ok {
return fmt.Errorf("streaming methods not supported by runtime")
}
pods, err := kl.containerRuntime.GetPods(false)
if err != nil {
return err
}
podUID = kl.podManager.TranslatePodUID(podUID)
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
if pod.IsEmpty() {
return fmt.Errorf("pod not found (%q)", podFullName)
}
return streamingRuntime.PortForward(&pod, port, stream)
}
// GetExec gets the URL the exec will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommand.Options) (*url.URL, error) {
switch streamingRuntime := kl.containerRuntime.(type) {
case kubecontainer.DirectStreamingRuntime:
// Kubelet will serve the exec directly.
return nil, nil
case kubecontainer.IndirectStreamingRuntime:
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return nil, err
}
if container == nil {
return nil, fmt.Errorf("container not found (%q)", containerName)
}
return streamingRuntime.GetExec(container.ID, cmd, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, streamOpts.TTY)
default:
return nil, fmt.Errorf("container runtime does not support exec")
}
}
// GetAttach gets the URL the attach will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommand.Options) (*url.URL, error) {
switch streamingRuntime := kl.containerRuntime.(type) {
case kubecontainer.DirectStreamingRuntime:
// Kubelet will serve the attach directly.
return nil, nil
case kubecontainer.IndirectStreamingRuntime:
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return nil, err
@ -1202,59 +1295,34 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe
return nil, fmt.Errorf("container not found (%q)", containerName)
}
var buffer bytes.Buffer
output := ioutils.WriteCloserWrapper(&buffer)
err = kl.runner.ExecInContainer(container.ID, cmd, nil, output, output, false, nil)
// Even if err is non-nil, there still may be output (e.g. the exec wrote to stdout or stderr but
// the command returned a nonzero exit code). Therefore, always return the output along with the
// error.
return buffer.Bytes(), err
return streamingRuntime.GetAttach(container.ID, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr)
default:
return nil, fmt.Errorf("container runtime does not support attach")
}
}
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
podUID = kl.podManager.TranslatePodUID(podUID)
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return kl.runner.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty, resize)
}
// AttachContainer uses the container runtime to attach the given streams to
// the given container.
func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
podUID = kl.podManager.TranslatePodUID(podUID)
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return kl.containerRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty, resize)
}
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16, stream io.ReadWriteCloser) error {
podUID = kl.podManager.TranslatePodUID(podUID)
// GetPortForward gets the URL the port-forward will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) {
switch streamingRuntime := kl.containerRuntime.(type) {
case kubecontainer.DirectStreamingRuntime:
// Kubelet will serve the attach directly.
return nil, nil
case kubecontainer.IndirectStreamingRuntime:
pods, err := kl.containerRuntime.GetPods(false)
if err != nil {
return err
return nil, err
}
podUID = kl.podManager.TranslatePodUID(podUID)
podFullName := kubecontainer.BuildPodFullName(podName, podNamespace)
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
if pod.IsEmpty() {
return fmt.Errorf("pod not found (%q)", podFullName)
return nil, fmt.Errorf("pod not found (%q)", podFullName)
}
return streamingRuntime.GetPortForward(podName, podNamespace, podUID)
default:
return nil, fmt.Errorf("container runtime does not support port-forward")
}
return kl.runner.PortForward(&pod, port, stream)
}
// cleanupOrphanedPodCgroups removes the Cgroups of pods that should not be

View File

@ -19,8 +19,6 @@ package kubelet
import (
"bytes"
"errors"
"fmt"
"io"
"net"
"sort"
"testing"
@ -31,9 +29,9 @@ import (
"k8s.io/kubernetes/pkg/apimachinery/registered"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/term"
)
func TestMakeMounts(t *testing.T) {
@ -111,47 +109,6 @@ func TestMakeMounts(t *testing.T) {
assert.Equal(t, expectedMounts, mounts, "mounts of container %+v", container)
}
type fakeContainerCommandRunner struct {
// what was passed in
Cmd []string
ID kubecontainer.ContainerID
PodID types.UID
E error
Stdin io.Reader
Stdout io.WriteCloser
Stderr io.WriteCloser
TTY bool
Port uint16
Stream io.ReadWriteCloser
// what to return
StdoutData string
StderrData string
}
func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
// record params
f.Cmd = cmd
f.ID = id
f.Stdin = in
f.Stdout = out
f.Stderr = err
f.TTY = tty
// Copy stdout/stderr data
fmt.Fprint(out, f.StdoutData)
fmt.Fprint(out, f.StderrData)
return f.E
}
func (f *fakeContainerCommandRunner) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
f.PodID = pod.ID
f.Port = port
f.Stream = stream
return nil
}
func TestRunInContainerNoSuchPod(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
@ -171,14 +128,13 @@ func TestRunInContainerNoSuchPod(t *testing.T) {
}
func TestRunInContainer(t *testing.T) {
for _, testError := range []error{nil, errors.New("foo")} {
for _, testError := range []error{nil, errors.New("bar")} {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
fakeCommandRunner := fakeContainerCommandRunner{
E: testError,
StdoutData: "foo",
StderrData: "bar",
fakeCommandRunner := containertest.FakeContainerCommandRunner{
Err: testError,
Stdout: "foo",
}
kubelet.runner = &fakeCommandRunner
@ -197,11 +153,11 @@ func TestRunInContainer(t *testing.T) {
}
cmd := []string{"ls"}
actualOutput, err := kubelet.RunInContainer("podFoo_nsFoo", "", "containerFoo", cmd)
assert.Equal(t, containerID, fakeCommandRunner.ID, "(testError=%v) ID", testError)
assert.Equal(t, containerID, fakeCommandRunner.ContainerID, "(testError=%v) ID", testError)
assert.Equal(t, cmd, fakeCommandRunner.Cmd, "(testError=%v) command", testError)
// this isn't 100% foolproof as a bug in a real ContainerCommandRunner where it fails to copy to stdout/stderr wouldn't be caught by this test
assert.Equal(t, "foobar", string(actualOutput), "(testError=%v) output", testError)
assert.Equal(t, fmt.Sprintf("%s", err), fmt.Sprintf("%s", testError), "(testError=%v) err", testError)
assert.Equal(t, "foo", string(actualOutput), "(testError=%v) output", testError)
assert.Equal(t, err, testError, "(testError=%v) err", testError)
}
}
@ -1051,73 +1007,6 @@ func TestPodPhaseWithRestartOnFailure(t *testing.T) {
}
}
func TestExecInContainerNoSuchPod(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet.runner = &fakeCommandRunner
fakeRuntime.PodList = []*containertest.FakePod{}
podName := "podFoo"
podNamespace := "nsFoo"
containerID := "containerFoo"
err := kubelet.ExecInContainer(
kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}),
"",
containerID,
[]string{"ls"},
nil,
nil,
nil,
false,
nil,
)
require.Error(t, err)
require.True(t, fakeCommandRunner.ID.IsEmpty(), "Unexpected invocation of runner.ExecInContainer")
}
func TestExecInContainerNoSuchContainer(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet.runner = &fakeCommandRunner
podName := "podFoo"
podNamespace := "nsFoo"
containerID := "containerFoo"
fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
Name: podName,
Namespace: podNamespace,
Containers: []*kubecontainer.Container{
{Name: "bar",
ID: kubecontainer.ContainerID{Type: "test", ID: "barID"}},
},
}},
}
err := kubelet.ExecInContainer(
kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: podName,
Namespace: podNamespace,
}}),
"",
containerID,
[]string{"ls"},
nil,
nil,
nil,
false,
nil,
)
require.Error(t, err)
require.True(t, fakeCommandRunner.ID.IsEmpty(), "Unexpected invocation of runner.ExecInContainer")
}
type fakeReadWriteCloser struct{}
func (f *fakeReadWriteCloser) Write(data []byte) (int, error) {
@ -1132,24 +1021,49 @@ func (f *fakeReadWriteCloser) Close() error {
return nil
}
func TestExecInContainer(t *testing.T) {
func TestExec(t *testing.T) {
const (
podName = "podFoo"
podNamespace = "nsFoo"
podUID types.UID = "12345678"
containerID = "containerFoo"
tty = true
)
var (
podFullName = kubecontainer.GetPodFullName(podWithUidNameNs(podUID, podName, podNamespace))
command = []string{"ls"}
stdin = &bytes.Buffer{}
stdout = &fakeReadWriteCloser{}
stderr = &fakeReadWriteCloser{}
)
testcases := []struct {
description string
podFullName string
container string
expectError bool
}{{
description: "success case",
podFullName: podFullName,
container: containerID,
}, {
description: "no such pod",
podFullName: "bar" + podFullName,
container: containerID,
expectError: true,
}, {
description: "no such container",
podFullName: podFullName,
container: "containerBar",
expectError: true,
}}
for _, tc := range testcases {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet.runner = &fakeCommandRunner
podName := "podFoo"
podNamespace := "nsFoo"
containerID := "containerFoo"
command := []string{"ls"}
stdin := &bytes.Buffer{}
stdout := &fakeReadWriteCloser{}
stderr := &fakeReadWriteCloser{}
tty := true
fakeRuntime.PodList = []*containertest.FakePod{
testKubelet.fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "12345678",
ID: podUID,
Name: podName,
Namespace: podNamespace,
Containers: []*kubecontainer.Container{
@ -1160,88 +1074,142 @@ func TestExecInContainer(t *testing.T) {
}},
}
err := kubelet.ExecInContainer(
kubecontainer.GetPodFullName(podWithUidNameNs("12345678", podName, podNamespace)),
"",
containerID,
[]string{"ls"},
stdin,
stdout,
stderr,
tty,
nil,
)
require.NoError(t, err)
require.Equal(t, fakeCommandRunner.ID.ID, containerID, "ID")
require.Equal(t, fakeCommandRunner.Cmd, command, "Command")
require.Equal(t, fakeCommandRunner.Stdin, stdin, "Stdin")
require.Equal(t, fakeCommandRunner.Stdout, stdout, "Stdout")
require.Equal(t, fakeCommandRunner.Stderr, stderr, "Stderr")
require.Equal(t, fakeCommandRunner.TTY, tty, "TTY")
}
{ // No streaming case
description := "no streaming - " + tc.description
redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{})
assert.Error(t, err, description)
assert.Nil(t, redirect, description)
func TestPortForwardNoSuchPod(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
fakeRuntime.PodList = []*containertest.FakePod{}
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet.runner = &fakeCommandRunner
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil)
assert.Error(t, err, description)
}
{ // Direct streaming case
description := "direct streaming - " + tc.description
fakeRuntime := &containertest.FakeDirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime
podName := "podFoo"
podNamespace := "nsFoo"
var port uint16 = 5000
redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{})
assert.NoError(t, err, description)
assert.Nil(t, redirect, description)
err := kubelet.PortForward(
kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{Name: podName, Namespace: podNamespace}}),
"",
port,
nil,
)
require.Error(t, err)
require.True(t, fakeCommandRunner.ID.IsEmpty(), "unexpected invocation of runner.PortForward")
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil)
if tc.expectError {
assert.Error(t, err, description)
} else {
assert.NoError(t, err, description)
assert.Equal(t, fakeRuntime.Args.ContainerID.ID, containerID, description+": ID")
assert.Equal(t, fakeRuntime.Args.Cmd, command, description+": Command")
assert.Equal(t, fakeRuntime.Args.Stdin, stdin, description+": Stdin")
assert.Equal(t, fakeRuntime.Args.Stdout, stdout, description+": Stdout")
assert.Equal(t, fakeRuntime.Args.Stderr, stderr, description+": Stderr")
assert.Equal(t, fakeRuntime.Args.TTY, tty, description+": TTY")
}
}
{ // Indirect streaming case
description := "indirect streaming - " + tc.description
fakeRuntime := &containertest.FakeIndirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime
redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{})
if tc.expectError {
assert.Error(t, err, description)
} else {
assert.NoError(t, err, description)
assert.Equal(t, containertest.FakeHost, redirect.Host, description+": redirect")
}
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil)
assert.Error(t, err, description)
}
}
}
func TestPortForward(t *testing.T) {
const (
podName = "podFoo"
podNamespace = "nsFoo"
podUID types.UID = "12345678"
port uint16 = 5000
)
var (
stream = &fakeReadWriteCloser{}
)
testcases := []struct {
description string
podName string
expectError bool
}{{
description: "success case",
podName: podName,
}, {
description: "no such pod",
podName: "bar",
expectError: true,
}}
for _, tc := range testcases {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
fakeRuntime := testKubelet.fakeRuntime
podName := "podFoo"
podNamespace := "nsFoo"
podID := types.UID("12345678")
fakeRuntime.PodList = []*containertest.FakePod{
testKubelet.fakeRuntime.PodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: podID,
ID: podUID,
Name: podName,
Namespace: podNamespace,
Containers: []*kubecontainer.Container{
{
Name: "foo",
ID: kubecontainer.ContainerID{Type: "test", ID: "containerFoo"},
{Name: "foo",
ID: kubecontainer.ContainerID{Type: "test", ID: "foo"},
},
},
}},
}
fakeCommandRunner := fakeContainerCommandRunner{}
kubelet.runner = &fakeCommandRunner
var port uint16 = 5000
stream := &fakeReadWriteCloser{}
err := kubelet.PortForward(
kubecontainer.GetPodFullName(&api.Pod{ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: podName,
Namespace: podNamespace,
}}),
"",
port,
stream,
)
require.NoError(t, err)
require.Equal(t, fakeCommandRunner.PodID, podID, "Pod ID")
require.Equal(t, fakeCommandRunner.Port, port, "Port")
require.Equal(t, fakeCommandRunner.Stream, stream, "stream")
podFullName := kubecontainer.GetPodFullName(podWithUidNameNs(podUID, tc.podName, podNamespace))
{ // No streaming case
description := "no streaming - " + tc.description
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID)
assert.Error(t, err, description)
assert.Nil(t, redirect, description)
err = kubelet.PortForward(podFullName, podUID, port, stream)
assert.Error(t, err, description)
}
{ // Direct streaming case
description := "direct streaming - " + tc.description
fakeRuntime := &containertest.FakeDirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID)
assert.NoError(t, err, description)
assert.Nil(t, redirect, description)
err = kubelet.PortForward(podFullName, podUID, port, stream)
if tc.expectError {
assert.Error(t, err, description)
} else {
assert.NoError(t, err, description)
require.Equal(t, fakeRuntime.Args.Pod.ID, podUID, description+": Pod UID")
require.Equal(t, fakeRuntime.Args.Port, port, description+": Port")
require.Equal(t, fakeRuntime.Args.Stream, stream, description+": stream")
}
}
{ // Indirect streaming case
description := "indirect streaming - " + tc.description
fakeRuntime := &containertest.FakeIndirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID)
if tc.expectError {
assert.Error(t, err, description)
} else {
assert.NoError(t, err, description)
assert.Equal(t, containertest.FakeHost, redirect.Host, description+": redirect")
}
err = kubelet.PortForward(podFullName, podUID, port, stream)
assert.Error(t, err, description)
}
}
}
// Tests that identify the host port conflicts are detected correctly.

View File

@ -21,6 +21,7 @@ import (
"io"
"io/ioutil"
"math/rand"
"net/url"
"os"
"path/filepath"
"sort"
@ -662,10 +663,10 @@ func findNextInitContainerToRun(pod *api.Pod, podStatus *kubecontainer.PodStatus
}
// AttachContainer attaches to the container's console
// TODO: Remove this method once the indirect streaming path is fully functional.
func (m *kubeGenericRuntimeManager) AttachContainer(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) {
// Use `docker attach` directly for in-process docker integration for
// now to unblock other tests.
// TODO: remove this hack after attach is defined in CRI.
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
return ds.LegacyAttach(id, stdin, stdout, stderr, tty, resize)
}
@ -684,14 +685,51 @@ func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *api.Pod, containerID k
return ReadLogs(path, logOptions, stdout, stderr)
}
// GetExec gets the endpoint the runtime will serve the exec request from.
func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
req := &runtimeApi.ExecRequest{
ContainerId: &id.ID,
Cmd: cmd,
Tty: &tty,
Stdin: &stdin,
}
resp, err := m.runtimeService.Exec(req)
if err != nil {
return nil, err
}
return url.Parse(resp.GetUrl())
}
// GetAttach gets the endpoint the runtime will serve the attach request from.
func (m *kubeGenericRuntimeManager) GetAttach(id kubecontainer.ContainerID, stdin, stdout, stderr bool) (*url.URL, error) {
req := &runtimeApi.AttachRequest{
ContainerId: &id.ID,
Stdin: &stdin,
}
resp, err := m.runtimeService.Attach(req)
if err != nil {
return nil, err
}
return url.Parse(resp.GetUrl())
}
// RunInContainer synchronously executes the command in the container, and returns the output.
func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID, cmd []string) ([]byte, error) {
stdout, stderr, err := m.runtimeService.ExecSync(id.ID, cmd, 0)
// NOTE(timstclair): This does not correctly interleave stdout & stderr, but should be sufficient
// for logging purposes. A combined output option will need to be added to the ExecSyncRequest
// if more precise output ordering is ever required.
return append(stdout, stderr...), err
}
// Runs the command in the container of the specified pod using nsenter.
// Attaches the processes stdin, stdout, and stderr. Optionally uses a
// tty.
// TODO: handle terminal resizing, refer https://github.com/kubernetes/kubernetes/issues/29579
// TODO: Remove this method once the indirect streaming path is fully functional.
func (m *kubeGenericRuntimeManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
// Use `docker exec` directly for in-process docker integration for
// now to unblock other tests.
// TODO: remove this hack after exec is defined in CRI.
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
return ds.LegacyExec(containerID, cmd, stdin, stdout, stderr, tty, resize)
}

View File

@ -110,6 +110,14 @@ type kubeGenericRuntimeManager struct {
versionCache *cache.ObjectCache
}
type KubeGenericRuntime interface {
kubecontainer.Runtime
kubecontainer.IndirectStreamingRuntime
kubecontainer.ContainerCommandRunner
// TODO(timstclair): Remove this once the indirect path is fully functional.
kubecontainer.DirectStreamingRuntime
}
// NewKubeGenericRuntimeManager creates a new kubeGenericRuntimeManager
func NewKubeGenericRuntimeManager(
recorder record.EventRecorder,
@ -128,7 +136,7 @@ func NewKubeGenericRuntimeManager(
cpuCFSQuota bool,
runtimeService internalApi.RuntimeService,
imageService internalApi.ImageManagerService,
) (kubecontainer.Runtime, error) {
) (KubeGenericRuntime, error) {
kubeRuntimeManager := &kubeGenericRuntimeManager{
recorder: recorder,
cpuCFSQuota: cpuCFSQuota,
@ -840,7 +848,7 @@ func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namesp
// Anyhow, we only promised "best-effort" restart count reporting, we can just ignore
// these limitations now.
// TODO: move this comment to SyncPod.
podSandboxIDs, err := m.getSandboxIDByPodUID(string(uid), nil)
podSandboxIDs, err := m.getSandboxIDByPodUID(uid, nil)
if err != nil {
return nil, err
}
@ -938,6 +946,7 @@ func (m *kubeGenericRuntimeManager) GetPodContainerID(pod *kubecontainer.Pod) (k
}
// Forward the specified port from the specified pod to the stream.
// TODO: Remove this method once the indirect streaming path is fully functional.
func (m *kubeGenericRuntimeManager) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
formattedPod := kubecontainer.FormatPod(pod)
if len(pod.Sandboxes) == 0 {
@ -947,7 +956,6 @@ func (m *kubeGenericRuntimeManager) PortForward(pod *kubecontainer.Pod, port uin
// Use docker portforward directly for in-process docker integration
// now to unblock other tests.
// TODO: remove this hack after portforward is defined in CRI.
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
return ds.LegacyPortForward(pod.Sandboxes[0].ID.ID, port, stream)
}

View File

@ -19,6 +19,7 @@ package kuberuntime
import (
"fmt"
"net"
"net/url"
"sort"
"github.com/golang/glog"
@ -27,6 +28,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
kubetypes "k8s.io/kubernetes/pkg/types"
)
// createPodSandbox creates a pod sandbox and returns (podSandBoxID, message, error).
@ -195,10 +197,10 @@ func (m *kubeGenericRuntimeManager) determinePodSandboxIP(podNamespace, podName
// getPodSandboxID gets the sandbox id by podUID and returns ([]sandboxID, error).
// Param state could be nil in order to get all sandboxes belonging to same pod.
func (m *kubeGenericRuntimeManager) getSandboxIDByPodUID(podUID string, state *runtimeApi.PodSandboxState) ([]string, error) {
func (m *kubeGenericRuntimeManager) getSandboxIDByPodUID(podUID kubetypes.UID, state *runtimeApi.PodSandboxState) ([]string, error) {
filter := &runtimeApi.PodSandboxFilter{
State: state,
LabelSelector: map[string]string{types.KubernetesPodUIDLabel: podUID},
LabelSelector: map[string]string{types.KubernetesPodUIDLabel: string(podUID)},
}
sandboxes, err := m.runtimeService.ListPodSandbox(filter)
if err != nil {
@ -219,3 +221,23 @@ func (m *kubeGenericRuntimeManager) getSandboxIDByPodUID(podUID string, state *r
return sandboxIDs, nil
}
// GetPortForward gets the endpoint the runtime will serve the port-forward request from.
func (m *kubeGenericRuntimeManager) GetPortForward(podName, podNamespace string, podUID kubetypes.UID) (*url.URL, error) {
sandboxIDs, err := m.getSandboxIDByPodUID(podUID, nil)
if err != nil {
return nil, fmt.Errorf("failed to find sandboxID for pod %s: %v", format.PodDesc(podName, podNamespace, podUID), err)
}
if len(sandboxIDs) == 0 {
return nil, fmt.Errorf("failed to find sandboxID for pod %s", format.PodDesc(podName, podNamespace, podUID))
}
// TODO: Port is unused for now, but we may need it in the future.
req := &runtimeApi.PortForwardRequest{
PodSandboxId: &sandboxIDs[0],
}
resp, err := m.runtimeService.PortForward(req)
if err != nil {
return nil, err
}
return url.Parse(resp.GetUrl())
}

View File

@ -25,7 +25,6 @@ go_library(
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/kubelet/util/ioutils:go_default_library",
"//pkg/security/apparmor:go_default_library",
"//pkg/types:go_default_library",
"//pkg/util/intstr:go_default_library",
@ -44,6 +43,5 @@ go_test(
"//pkg/api:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/util/intstr:go_default_library",
"//pkg/util/term:go_default_library",
],
)

View File

@ -17,7 +17,6 @@ limitations under the License.
package lifecycle
import (
"bytes"
"fmt"
"io/ioutil"
"net"
@ -29,7 +28,6 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
"k8s.io/kubernetes/pkg/security/apparmor"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/intstr"
@ -56,14 +54,10 @@ func NewHandlerRunner(httpGetter kubetypes.HttpGetter, commandRunner kubecontain
func (hr *HandlerRunner) Run(containerID kubecontainer.ContainerID, pod *api.Pod, container *api.Container, handler *api.Handler) (string, error) {
switch {
case handler.Exec != nil:
var (
buffer bytes.Buffer
msg string
)
output := ioutils.WriteCloserWrapper(&buffer)
err := hr.commandRunner.ExecInContainer(containerID, handler.Exec.Command, nil, output, output, false, nil)
var msg string
output, err := hr.commandRunner.RunInContainer(containerID, handler.Exec.Command)
if err != nil {
msg := fmt.Sprintf("Exec lifecycle hook (%v) for Container %q in Pod %q failed - error: %v, message: %q", handler.Exec.Command, container.Name, format.Pod(pod), err, buffer.String())
msg := fmt.Sprintf("Exec lifecycle hook (%v) for Container %q in Pod %q failed - error: %v, message: %q", handler.Exec.Command, container.Name, format.Pod(pod), err, string(output))
glog.V(1).Infof(msg)
}
return msg, err

View File

@ -18,7 +18,6 @@ package lifecycle
import (
"fmt"
"io"
"io/ioutil"
"net/http"
"reflect"
@ -28,7 +27,6 @@ import (
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/term"
)
func TestResolvePortInt(t *testing.T) {
@ -81,14 +79,10 @@ type fakeContainerCommandRunner struct {
ID kubecontainer.ContainerID
}
func (f *fakeContainerCommandRunner) ExecInContainer(id kubecontainer.ContainerID, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
func (f *fakeContainerCommandRunner) RunInContainer(id kubecontainer.ContainerID, cmd []string) ([]byte, error) {
f.Cmd = cmd
f.ID = id
return nil
}
func (f *fakeContainerCommandRunner) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
return nil
return nil, nil
}
func TestRunHandlerExec(t *testing.T) {

View File

@ -26,7 +26,6 @@ go_library(
"//pkg/kubelet/prober/results:go_default_library",
"//pkg/kubelet/status:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/kubelet/util/ioutils:go_default_library",
"//pkg/probe:go_default_library",
"//pkg/probe/exec:go_default_library",
"//pkg/probe/http:go_default_library",
@ -57,6 +56,7 @@ go_test(
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/container/testing:go_default_library",
"//pkg/kubelet/pod:go_default_library",
"//pkg/kubelet/prober/results:go_default_library",
"//pkg/kubelet/status:go_default_library",
@ -65,7 +65,6 @@ go_test(
"//pkg/util/exec:go_default_library",
"//pkg/util/intstr:go_default_library",
"//pkg/util/runtime:go_default_library",
"//pkg/util/term:go_default_library",
"//pkg/util/wait:go_default_library",
"//vendor:github.com/golang/glog",
],

View File

@ -17,7 +17,6 @@ limitations under the License.
package prober
import (
"bytes"
"fmt"
"io"
"net"
@ -33,7 +32,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
"k8s.io/kubernetes/pkg/probe"
execprobe "k8s.io/kubernetes/pkg/probe/exec"
httprobe "k8s.io/kubernetes/pkg/probe/http"
@ -230,13 +228,7 @@ type execInContainer struct {
func (p *prober) newExecInContainer(container api.Container, containerID kubecontainer.ContainerID, cmd []string) exec.Cmd {
return execInContainer{func() ([]byte, error) {
var buffer bytes.Buffer
output := ioutils.WriteCloserWrapper(&buffer)
err := p.runner.ExecInContainer(containerID, cmd, nil, output, output, false, nil)
// Even if err is non-nil, there still may be output (e.g. the exec wrote to stdout or stderr but
// the command returned a nonzero exit code). Therefore, always return the output along with the
// error.
return buffer.Bytes(), err
return p.runner.RunInContainer(containerID, cmd)
}}
}

View File

@ -19,7 +19,6 @@ package prober
import (
"errors"
"fmt"
"io"
"net/http"
"reflect"
"testing"
@ -27,10 +26,10 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/term"
)
func TestFormatURL(t *testing.T) {
@ -279,40 +278,6 @@ func TestProbe(t *testing.T) {
}
}
type fakeContainerCommandRunner struct {
// what to return
stdoutData string
stderrData string
err error
// actual values when invoked
containerID kubecontainer.ContainerID
cmd []string
stdin io.Reader
tty bool
resize <-chan term.Size
}
var _ kubecontainer.ContainerCommandRunner = &fakeContainerCommandRunner{}
func (f *fakeContainerCommandRunner) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
// record invoked values
f.containerID = containerID
f.cmd = cmd
f.stdin = stdin
f.tty = tty
f.resize = resize
fmt.Fprint(stdout, f.stdoutData)
fmt.Fprint(stdout, f.stderrData)
return f.err
}
func (f *fakeContainerCommandRunner) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
panic("not implemented")
}
func TestNewExecInContainer(t *testing.T) {
tests := []struct {
name string
@ -329,10 +294,9 @@ func TestNewExecInContainer(t *testing.T) {
}
for _, test := range tests {
runner := &fakeContainerCommandRunner{
stdoutData: "foo",
stderrData: "bar",
err: test.err,
runner := &containertest.FakeContainerCommandRunner{
Stdout: "foo",
Err: test.err,
}
prober := &prober{
runner: runner,
@ -344,23 +308,14 @@ func TestNewExecInContainer(t *testing.T) {
exec := prober.newExecInContainer(container, containerID, cmd)
actualOutput, err := exec.CombinedOutput()
if e, a := containerID, runner.containerID; e != a {
if e, a := containerID, runner.ContainerID; e != a {
t.Errorf("%s: container id: expected %v, got %v", test.name, e, a)
}
if e, a := cmd, runner.cmd; !reflect.DeepEqual(e, a) {
if e, a := cmd, runner.Cmd; !reflect.DeepEqual(e, a) {
t.Errorf("%s: cmd: expected %v, got %v", test.name, e, a)
}
if runner.stdin != nil {
t.Errorf("%s: stdin: expected nil, got %v", test.name, runner.stdin)
}
if runner.tty {
t.Errorf("%s: tty: expected false", test.name)
}
if runner.resize != nil {
t.Errorf("%s: resize chan: expected nil, got %v", test.name, runner.resize)
}
// this isn't 100% foolproof as a bug in a real ContainerCommandRunner where it fails to copy to stdout/stderr wouldn't be caught by this test
if e, a := "foobar", string(actualOutput); e != a {
if e, a := "foo", string(actualOutput); e != a {
t.Errorf("%s: output: expected %q, got %q", test.name, e, a)
}
if e, a := fmt.Sprintf("%v", test.err), fmt.Sprintf("%v", err); e != a {

View File

@ -181,6 +181,7 @@ type Runtime struct {
}
var _ kubecontainer.Runtime = &Runtime{}
var _ kubecontainer.DirectStreamingRuntime = &Runtime{}
// TODO(yifan): This duplicates the podGetter in dockertools.
type podGetter interface {
@ -276,7 +277,8 @@ func New(
return nil, fmt.Errorf("rkt: cannot get config from rkt api service: %v", err)
}
rkt.runner = lifecycle.NewHandlerRunner(httpClient, rkt, rkt)
cmdRunner := kubecontainer.DirectStreamingRunner(rkt)
rkt.runner = lifecycle.NewHandlerRunner(httpClient, cmdRunner, rkt)
rkt.imagePuller = images.NewImageManager(recorder, rkt, imageBackOff, serializeImagePulls, imagePullQPS, imagePullBurst)

View File

@ -34,6 +34,7 @@ go_library(
"//pkg/kubelet/server/portforward:go_default_library",
"//pkg/kubelet/server/remotecommand:go_default_library",
"//pkg/kubelet/server/stats:go_default_library",
"//pkg/kubelet/server/streaming:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/types:go_default_library",
"//pkg/util/configz:go_default_library",
@ -65,6 +66,7 @@ go_test(
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/container/testing:go_default_library",
"//pkg/kubelet/server/remotecommand:go_default_library",
"//pkg/kubelet/server/stats:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/types:go_default_library",
@ -75,5 +77,7 @@ go_test(
"//pkg/volume:go_default_library",
"//vendor:github.com/google/cadvisor/info/v1",
"//vendor:github.com/google/cadvisor/info/v2",
"//vendor:github.com/stretchr/testify/assert",
"//vendor:github.com/stretchr/testify/require",
],
)

View File

@ -36,18 +36,18 @@ import (
"github.com/golang/glog"
)
// options contains details about which streams are required for
// Options contains details about which streams are required for
// remote command execution.
type options struct {
stdin bool
stdout bool
stderr bool
tty bool
type Options struct {
Stdin bool
Stdout bool
Stderr bool
TTY bool
expectedStreams int
}
// newOptions creates a new options from the Request.
func newOptions(req *http.Request) (*options, error) {
// NewOptions creates a new Options from the Request.
func NewOptions(req *http.Request) (*Options, error) {
tty := req.FormValue(api.ExecTTYParam) == "1"
stdin := req.FormValue(api.ExecStdinParam) == "1"
stdout := req.FormValue(api.ExecStdoutParam) == "1"
@ -74,11 +74,11 @@ func newOptions(req *http.Request) (*options, error) {
return nil, fmt.Errorf("you must specify at least 1 of stdin, stdout, stderr")
}
return &options{
stdin: stdin,
stdout: stdout,
stderr: stderr,
tty: tty,
return &Options{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
TTY: tty,
expectedStreams: expectedStreams,
}, nil
}
@ -116,7 +116,7 @@ func waitStreamReply(replySent <-chan struct{}, notify chan<- struct{}, stop <-c
}
func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
opts, err := newOptions(req)
opts, err := NewOptions(req)
if err != nil {
runtime.HandleError(err)
w.WriteHeader(http.StatusBadRequest)
@ -143,7 +143,7 @@ func createStreams(req *http.Request, w http.ResponseWriter, supportedStreamProt
return ctx, true
}
func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *Options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
@ -183,7 +183,7 @@ func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *opt
handler = &v1ProtocolHandler{}
}
if opts.tty && handler.supportsTerminalResizing() {
if opts.TTY && handler.supportsTerminalResizing() {
opts.expectedStreams++
}
@ -197,7 +197,7 @@ func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *opt
}
ctx.conn = conn
ctx.tty = opts.tty
ctx.tty = opts.TTY
return ctx, true
}

View File

@ -41,12 +41,12 @@ const (
// createChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2)
// along with the approximate duplex value. It also creates the error (3) and resize (4) channels.
func createChannels(opts *options) []wsstream.ChannelType {
func createChannels(opts *Options) []wsstream.ChannelType {
// open the requested channels, and always open the error channel
channels := make([]wsstream.ChannelType, 5)
channels[stdinChannel] = readChannel(opts.stdin)
channels[stdoutChannel] = writeChannel(opts.stdout)
channels[stderrChannel] = writeChannel(opts.stderr)
channels[stdinChannel] = readChannel(opts.Stdin)
channels[stdoutChannel] = writeChannel(opts.Stdout)
channels[stderrChannel] = writeChannel(opts.Stderr)
channels[errorChannel] = wsstream.WriteChannel
channels[resizeChannel] = wsstream.ReadChannel
return channels
@ -70,7 +70,7 @@ func writeChannel(real bool) wsstream.ChannelType {
// createWebSocketStreams returns a context containing the websocket connection and
// streams needed to perform an exec or an attach.
func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *options, idleTimeout time.Duration) (*context, bool) {
func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *Options, idleTimeout time.Duration) (*context, bool) {
channels := createChannels(opts)
conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{
"": {
@ -104,9 +104,9 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *opti
// Send an empty message to the lowest writable channel to notify the client the connection is established
// TODO: make generic to SPDY and WebSockets and do it outside of this method?
switch {
case opts.stdout:
case opts.Stdout:
streams[stdoutChannel].Write([]byte{})
case opts.stderr:
case opts.Stderr:
streams[stderrChannel].Write([]byte{})
default:
streams[errorChannel].Write([]byte{})
@ -117,7 +117,7 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *opti
stdinStream: streams[stdinChannel],
stdoutStream: streams[stdoutChannel],
stderrStream: streams[stderrChannel],
tty: opts.tty,
tty: opts.TTY,
resizeStream: streams[resizeChannel],
}

View File

@ -23,6 +23,7 @@ import (
"net"
"net/http"
"net/http/pprof"
"net/url"
"reflect"
"strconv"
"strings"
@ -48,6 +49,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/configz"
@ -178,6 +180,9 @@ type HostInterface interface {
RootFsInfo() (cadvisorapiv2.FsInfo, error)
ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool)
PLEGHealthCheck() (bool, error)
GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommand.Options) (*url.URL, error)
GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommand.Options) (*url.URL, error)
GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error)
}
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
@ -565,31 +570,59 @@ func (s *Server) getSpec(request *restful.Request, response *restful.Response) {
response.WriteEntity(info)
}
func getContainerCoordinates(request *restful.Request) (namespace, pod string, uid types.UID, container string) {
namespace = request.PathParameter("podNamespace")
pod = request.PathParameter("podID")
if uidStr := request.PathParameter("uid"); uidStr != "" {
uid = types.UID(uidStr)
type requestParams struct {
podNamespace string
podName string
podUID types.UID
containerName string
cmd []string
streamOpts remotecommand.Options
}
func getRequestParams(req *restful.Request) requestParams {
streamOpts, err := remotecommand.NewOptions(req.Request)
if err != nil {
glog.Warningf("Unable to parse request stream options: %v", err)
}
if streamOpts == nil {
streamOpts = &remotecommand.Options{}
}
return requestParams{
podNamespace: req.PathParameter("podNamespace"),
podName: req.PathParameter("podID"),
podUID: types.UID(req.PathParameter("uid")),
containerName: req.PathParameter("containerName"),
cmd: req.Request.URL.Query()[api.ExecCommandParamm],
streamOpts: *streamOpts,
}
container = request.PathParameter("containerName")
return
}
// getAttach handles requests to attach to a container.
func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
podNamespace, podID, uid, container := getContainerCoordinates(request)
pod, ok := s.host.GetPodByName(podNamespace, podID)
params := getRequestParams(request)
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
return
}
podFullName := kubecontainer.GetPodFullName(pod)
redirect, err := s.host.GetAttach(podFullName, params.podUID, params.containerName, params.streamOpts)
if err != nil {
response.WriteError(streaming.HTTPStatus(err), err)
return
}
if redirect != nil {
http.Redirect(response.ResponseWriter, request.Request, redirect.String(), http.StatusFound)
return
}
remotecommand.ServeAttach(response.ResponseWriter,
request.Request,
s.host,
kubecontainer.GetPodFullName(pod),
uid,
container,
podFullName,
params.podUID,
params.containerName,
s.host.StreamingConnectionIdleTimeout(),
remotecommand.DefaultStreamCreationTimeout,
remotecommand.SupportedStreamingProtocols)
@ -597,19 +630,30 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response)
// getExec handles requests to run a command inside a container.
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
podNamespace, podID, uid, container := getContainerCoordinates(request)
pod, ok := s.host.GetPodByName(podNamespace, podID)
params := getRequestParams(request)
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
return
}
podFullName := kubecontainer.GetPodFullName(pod)
redirect, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, params.streamOpts)
if err != nil {
response.WriteError(streaming.HTTPStatus(err), err)
return
}
if redirect != nil {
http.Redirect(response.ResponseWriter, request.Request, redirect.String(), http.StatusFound)
return
}
remotecommand.ServeExec(response.ResponseWriter,
request.Request,
s.host,
kubecontainer.GetPodFullName(pod),
uid,
container,
podFullName,
params.podUID,
params.containerName,
s.host.StreamingConnectionIdleTimeout(),
remotecommand.DefaultStreamCreationTimeout,
remotecommand.SupportedStreamingProtocols)
@ -617,14 +661,16 @@ func (s *Server) getExec(request *restful.Request, response *restful.Response) {
// getRun handles requests to run a command inside a container.
func (s *Server) getRun(request *restful.Request, response *restful.Response) {
podNamespace, podID, uid, container := getContainerCoordinates(request)
pod, ok := s.host.GetPodByName(podNamespace, podID)
params := getRequestParams(request)
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
return
}
command := strings.Split(request.QueryParameter("cmd"), " ")
data, err := s.host.RunInContainer(kubecontainer.GetPodFullName(pod), uid, container, command)
// For legacy reasons, run uses different query param than exec.
params.cmd = strings.Split(request.QueryParameter("cmd"), " ")
data, err := s.host.RunInContainer(kubecontainer.GetPodFullName(pod), params.podUID, params.containerName, params.cmd)
if err != nil {
response.WriteError(http.StatusInternalServerError, err)
return
@ -632,15 +678,6 @@ func (s *Server) getRun(request *restful.Request, response *restful.Response) {
writeJsonResponse(response, data)
}
func getPodCoordinates(request *restful.Request) (namespace, pod string, uid types.UID) {
namespace = request.PathParameter("podNamespace")
pod = request.PathParameter("podID")
if uidStr := request.PathParameter("uid"); uidStr != "" {
uid = types.UID(uidStr)
}
return
}
// Derived from go-restful writeJSON.
func writeJsonResponse(response *restful.Response, data []byte) {
if data == nil {
@ -658,16 +695,30 @@ func writeJsonResponse(response *restful.Response, data []byte) {
// getPortForward handles a new restful port forward request. It determines the
// pod name and uid and then calls ServePortForward.
func (s *Server) getPortForward(request *restful.Request, response *restful.Response) {
podNamespace, podID, uid := getPodCoordinates(request)
pod, ok := s.host.GetPodByName(podNamespace, podID)
params := getRequestParams(request)
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
return
}
podName := kubecontainer.GetPodFullName(pod)
redirect, err := s.host.GetPortForward(params.podName, params.podNamespace, params.podUID)
if err != nil {
response.WriteError(streaming.HTTPStatus(err), err)
return
}
if redirect != nil {
http.Redirect(response.ResponseWriter, request.Request, redirect.String(), http.StatusFound)
return
}
portforward.ServePortForward(response.ResponseWriter, request.Request, s.host, podName, uid, s.host.StreamingConnectionIdleTimeout(), remotecommand.DefaultStreamCreationTimeout)
portforward.ServePortForward(response.ResponseWriter,
request.Request,
s.host,
kubecontainer.GetPodFullName(pod),
params.podUID,
s.host.StreamingConnectionIdleTimeout(),
remotecommand.DefaultStreamCreationTimeout)
}
// ServeHTTP responds to HTTP requests on the Kubelet.

View File

@ -27,6 +27,7 @@ import (
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"reflect"
"strconv"
"strings"
@ -35,6 +36,8 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/kubernetes/pkg/api"
apierrs "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/auth/authorizer"
@ -42,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubecontainertesting "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/types"
@ -70,6 +74,7 @@ type fakeKubelet struct {
resyncInterval time.Duration
loopEntryTime time.Time
plegHealth bool
redirectURL *url.URL
}
func (fk *fakeKubelet) ResyncInterval() time.Duration {
@ -132,6 +137,18 @@ func (fk *fakeKubelet) PortForward(name string, uid types.UID, port uint16, stre
return fk.portForwardFunc(name, uid, port, stream)
}
func (fk *fakeKubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommand.Options) (*url.URL, error) {
return fk.redirectURL, nil
}
func (fk *fakeKubelet) GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommand.Options) (*url.URL, error) {
return fk.redirectURL, nil
}
func (fk *fakeKubelet) GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) {
return fk.redirectURL, nil
}
func (fk *fakeKubelet) StreamingConnectionIdleTimeout() time.Duration {
return fk.streamingConnectionIdleTimeoutFunc()
}
@ -1136,6 +1153,7 @@ func testExecAttach(t *testing.T, verb string) {
tty bool
responseStatusCode int
uid bool
responseLocation string
}{
{responseStatusCode: http.StatusBadRequest},
{stdin: true, responseStatusCode: http.StatusSwitchingProtocols},
@ -1144,6 +1162,7 @@ func testExecAttach(t *testing.T, verb string) {
{stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
{stdout: true, stderr: true, tty: true, responseStatusCode: http.StatusSwitchingProtocols},
{stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
{responseStatusCode: http.StatusFound, responseLocation: "http://localhost:12345/" + verb},
}
for i, test := range tests {
@ -1154,6 +1173,12 @@ func testExecAttach(t *testing.T, verb string) {
return 0
}
if test.responseLocation != "" {
var err error
fw.fakeKubelet.redirectURL, err = url.Parse(test.responseLocation)
require.NoError(t, err)
}
podNamespace := "other"
podName := "foo"
expectedPodName := getPodName(podName, podNamespace)
@ -1277,6 +1302,10 @@ func testExecAttach(t *testing.T, verb string) {
if test.responseStatusCode != http.StatusSwitchingProtocols {
c = &http.Client{}
// Don't follow redirects, since we want to inspect the redirect response.
c.CheckRedirect = func(*http.Request, []*http.Request) error {
return http.ErrUseLastResponse
}
} else {
upgradeRoundTripper = spdy.NewRoundTripper(nil)
c = &http.Client{Transport: upgradeRoundTripper}
@ -1297,6 +1326,10 @@ func testExecAttach(t *testing.T, verb string) {
t.Fatalf("%d: response status: expected %v, got %v", i, e, a)
}
if e, a := test.responseLocation, resp.Header.Get("Location"); e != a {
t.Errorf("%d: response location: expected %v, got %v", i, e, a)
}
if test.responseStatusCode != http.StatusSwitchingProtocols {
continue
}
@ -1440,6 +1473,7 @@ func TestServePortForward(t *testing.T) {
clientData string
containerData string
shouldError bool
responseLocation string
}{
{port: "", shouldError: true},
{port: "abc", shouldError: true},
@ -1451,6 +1485,7 @@ func TestServePortForward(t *testing.T) {
{port: "8000", clientData: "client data", containerData: "container data", shouldError: false},
{port: "65535", shouldError: false},
{port: "65535", uid: true, shouldError: false},
{port: "65535", responseLocation: "http://localhost:12345/portforward", shouldError: false},
}
podNamespace := "other"
@ -1466,6 +1501,12 @@ func TestServePortForward(t *testing.T) {
return 0
}
if test.responseLocation != "" {
var err error
fw.fakeKubelet.redirectURL, err = url.Parse(test.responseLocation)
require.NoError(t, err)
}
portForwardFuncDone := make(chan struct{})
fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
@ -1517,6 +1558,10 @@ func TestServePortForward(t *testing.T) {
upgradeRoundTripper := spdy.NewRoundTripper(nil)
c := &http.Client{Transport: upgradeRoundTripper}
// Don't follow redirects, since we want to inspect the redirect response.
c.CheckRedirect = func(*http.Request, []*http.Request) error {
return http.ErrUseLastResponse
}
resp, err := c.Post(url, "", nil)
if err != nil {
@ -1524,6 +1569,14 @@ func TestServePortForward(t *testing.T) {
}
defer resp.Body.Close()
if test.responseLocation != "" {
assert.Equal(t, http.StatusFound, resp.StatusCode, "%d: status code", i)
assert.Equal(t, test.responseLocation, resp.Header.Get("Location"), "%d: location", i)
continue
} else {
assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode, "%d: status code", i)
}
conn, err := upgradeRoundTripper.NewConnection(resp)
if err != nil {
t.Fatalf("Unexpected error creating streaming connection: %s", err)

View File

@ -24,6 +24,7 @@ go_library(
"//pkg/types:go_default_library",
"//pkg/util/term:go_default_library",
"//vendor:github.com/emicklei/go-restful",
"//vendor:google.golang.org/grpc",
"//vendor:google.golang.org/grpc/codes",
"//vendor:k8s.io/client-go/pkg/api",
],

View File

@ -18,30 +18,27 @@ package streaming
import (
"fmt"
"net/http"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
type ResponseError struct {
Err string
Code codes.Code
}
func (e *ResponseError) Error() string {
return e.Err
}
func ErrorStreamingDisabled(method string) error {
return &ResponseError{
Err: fmt.Sprintf("streaming method %s disabled", method),
Code: codes.NotFound,
}
return grpc.Errorf(codes.NotFound, fmt.Sprintf("streaming method %s disabled", method))
}
func ErrorTimeout(op string, timeout time.Duration) error {
return &ResponseError{
Err: fmt.Sprintf("%s timed out after %s", op, timeout.String()),
Code: codes.DeadlineExceeded,
return grpc.Errorf(codes.DeadlineExceeded, fmt.Sprintf("%s timed out after %s", op, timeout.String()))
}
// Translates a CRI streaming error into an HTTP status code.
func HTTPStatus(err error) int {
switch grpc.Code(err) {
case codes.NotFound:
return http.StatusNotFound
default:
return http.StatusInternalServerError
}
}

View File

@ -17,7 +17,10 @@ go_library(
"resources.go",
],
tags = ["automanaged"],
deps = ["//pkg/api:go_default_library"],
deps = [
"//pkg/api:go_default_library",
"//pkg/types:go_default_library",
],
)
go_test(

View File

@ -22,16 +22,23 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types"
)
type podHandler func(*api.Pod) string
// Pod returns a string reprenetating a pod in a human readable format,
// Pod returns a string representing a pod in a consistent human readable format,
// with pod UID as part of the string.
func Pod(pod *api.Pod) string {
return PodDesc(pod.Name, pod.Namespace, pod.UID)
}
// PodDesc returns a string representing a pod in a consistent human readable format,
// with pod UID as part of the string.
func PodDesc(podName, podNamespace string, podUID types.UID) string {
// Use underscore as the delimiter because it is not allowed in pod name
// (DNS subdomain format), while allowed in the container name format.
return fmt.Sprintf("%s_%s(%s)", pod.Name, pod.Namespace, pod.UID)
return fmt.Sprintf("%s_%s(%s)", podName, podNamespace, podUID)
}
// PodWithDeletionTimestamp is the same as Pod. In addition, it prints the