Add timeout argument to ExecInContainer

This allows us to interrupt/kill the executed command if it exceeds the
timeout (not implemented by this commit).

Set timeout in Exec probes. HTTPGet and TCPSocket probes respect the
timeout, while Exec probes used to ignore it.

Add e2e test for exec probe with timeout. However, the test is skipped
while the default exec handler doesn't support timeouts.
pull/6/head
Rodolfo Carvalho 2016-06-23 19:44:26 +02:00
parent 83c5a1c895
commit 506129ba4e
22 changed files with 70 additions and 36 deletions

View File

@ -53,7 +53,7 @@ type fakeExecutor struct {
exec bool
}
func (ex *fakeExecutor) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
func (ex *fakeExecutor) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
return ex.run(name, uid, container, cmd, in, out, err, tty)
}

View File

@ -21,6 +21,7 @@ import (
"fmt"
"hash/adler32"
"strings"
"time"
"github.com/golang/glog"
@ -236,10 +237,10 @@ func DirectStreamingRunner(runtime DirectStreamingRuntime) ContainerCommandRunne
return &containerCommandRunnerWrapper{runtime}
}
func (r *containerCommandRunnerWrapper) RunInContainer(id ContainerID, cmd []string) ([]byte, error) {
func (r *containerCommandRunnerWrapper) RunInContainer(id ContainerID, cmd []string, timeout time.Duration) ([]byte, error) {
var buffer bytes.Buffer
output := ioutils.WriteCloserWrapper(&buffer)
err := r.ExecInContainer(id, cmd, nil, output, output, false, nil)
err := r.ExecInContainer(id, cmd, nil, output, output, false, nil, timeout)
// Even if err is non-nil, there still may be output (e.g. the exec wrote to stdout or stderr but
// the command returned a nonzero exit code). Therefore, always return the output along with the
// error.

View File

@ -128,7 +128,7 @@ type DirectStreamingRuntime interface {
// Runs the command in the container of the specified pod using nsenter.
// Attaches the processes stdin, stdout, and stderr. Optionally uses a
// tty.
ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error
// Forward the specified port from the specified pod to the stream.
PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error
// ContainerAttach encapsulates the attaching to containers for testability
@ -165,7 +165,7 @@ type ContainerAttacher interface {
type ContainerCommandRunner interface {
// RunInContainer synchronously executes the command in the container, and returns the output.
// If the command completes with a non-0 exit code, a pkg/util/exec.ExitError will be returned.
RunInContainer(id ContainerID, cmd []string) ([]byte, error)
RunInContainer(id ContainerID, cmd []string, timeout time.Duration) ([]byte, error)
}
// Pod is a group of containers.

View File

@ -307,7 +307,7 @@ func (f *FakeRuntime) GetPodStatus(uid types.UID, name, namespace string) (*PodS
return &status, f.Err
}
func (f *FakeDirectStreamingRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
func (f *FakeDirectStreamingRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
f.Lock()
defer f.Unlock()
@ -487,7 +487,7 @@ type FakeContainerCommandRunner struct {
var _ ContainerCommandRunner = &FakeContainerCommandRunner{}
func (f *FakeContainerCommandRunner) RunInContainer(containerID ContainerID, cmd []string) ([]byte, error) {
func (f *FakeContainerCommandRunner) RunInContainer(containerID ContainerID, cmd []string, timeout time.Duration) ([]byte, error) {
// record invoked values
f.ContainerID = containerID
f.Cmd = cmd

View File

@ -18,6 +18,7 @@ package testing
import (
"io"
"time"
"github.com/stretchr/testify/mock"
"k8s.io/kubernetes/pkg/api"
@ -89,7 +90,7 @@ func (r *Mock) GetPodStatus(uid types.UID, name, namespace string) (*PodStatus,
return args.Get(0).(*PodStatus), args.Error(1)
}
func (r *Mock) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
func (r *Mock) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
args := r.Called(containerID, cmd, stdin, stdout, stderr, tty)
return args.Error(0)
}

View File

@ -48,9 +48,7 @@ func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader,
if err != nil {
return err
}
// TODO(timstclair): Add timeout once PR#33366 merges.
return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize)
return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize, timeout)
}
func (r *streamingRuntime) Attach(containerID string, in io.Reader, out, errw io.WriteCloser, resize <-chan term.Size) error {

View File

@ -1265,7 +1265,7 @@ func (d *dockerExitError) ExitStatus() int {
}
// ExecInContainer runs the command inside the container identified by containerID.
func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
if dm.execHandler == nil {
return errors.New("unable to exec without an exec handler")
}
@ -1278,7 +1278,7 @@ func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID,
return fmt.Errorf("container not running (%s)", container.ID)
}
return dm.execHandler.ExecInContainer(dm.client, container, cmd, stdin, stdout, stderr, tty, resize)
return dm.execHandler.ExecInContainer(dm.client, container, cmd, stdin, stdout, stderr, tty, resize, timeout)
}
func (dm *DockerManager) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {

View File

@ -32,14 +32,14 @@ import (
// ExecHandler knows how to execute a command in a running Docker container.
type ExecHandler interface {
ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error
}
// NsenterExecHandler executes commands in Docker containers using nsenter.
type NsenterExecHandler struct{}
// TODO should we support nsenter in a container, running with elevated privs and --pid=host?
func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
nsenter, err := exec.LookPath("nsenter")
if err != nil {
return fmt.Errorf("exec unavailable - unable to locate nsenter")
@ -109,7 +109,7 @@ func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *do
// NativeExecHandler executes commands in Docker containers using Docker's exec API.
type NativeExecHandler struct{}
func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
func (*NativeExecHandler) ExecInContainer(client DockerInterface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
createOpts := dockertypes.ExecConfig{
Cmd: cmd,
AttachStdin: stdin != nil,

View File

@ -29,6 +29,7 @@ import (
"runtime"
"sort"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
@ -1235,12 +1236,13 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe
if container == nil {
return nil, fmt.Errorf("container not found (%q)", containerName)
}
return kl.runner.RunInContainer(container.ID, cmd)
// TODO(timstclair): Pass a proper timeout value.
return kl.runner.RunInContainer(container.ID, cmd, 0)
}
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime)
if !ok {
return fmt.Errorf("streaming methods not supported by runtime")
@ -1253,7 +1255,7 @@ func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, contain
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return streamingRuntime.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty, resize)
return streamingRuntime.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty, resize, timeout)
}
// AttachContainer uses the container runtime to attach the given streams to

View File

@ -1081,7 +1081,7 @@ func TestExec(t *testing.T) {
assert.Error(t, err, description)
assert.Nil(t, redirect, description)
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil)
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil, 0)
assert.Error(t, err, description)
}
{ // Direct streaming case
@ -1093,7 +1093,7 @@ func TestExec(t *testing.T) {
assert.NoError(t, err, description)
assert.Nil(t, redirect, description)
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil)
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil, 0)
if tc.expectError {
assert.Error(t, err, description)
} else {
@ -1119,7 +1119,7 @@ func TestExec(t *testing.T) {
assert.Equal(t, containertest.FakeHost, redirect.Host, description+": redirect")
}
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil)
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil, 0)
assert.Error(t, err, description)
}
}

View File

@ -732,7 +732,7 @@ func (m *kubeGenericRuntimeManager) GetAttach(id kubecontainer.ContainerID, stdi
}
// RunInContainer synchronously executes the command in the container, and returns the output.
func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID, cmd []string) ([]byte, error) {
func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID, cmd []string, timeout time.Duration) ([]byte, error) {
stdout, stderr, err := m.runtimeService.ExecSync(id.ID, cmd, 0)
// NOTE(timstclair): This does not correctly interleave stdout & stderr, but should be sufficient
// for logging purposes. A combined output option will need to be added to the ExecSyncRequest
@ -744,7 +744,7 @@ func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID,
// Attaches the processes stdin, stdout, and stderr. Optionally uses a
// tty.
// TODO: Remove this method once the indirect streaming path is fully functional.
func (m *kubeGenericRuntimeManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
func (m *kubeGenericRuntimeManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
// Use `docker exec` directly for in-process docker integration for
// now to unblock other tests.
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {

View File

@ -55,7 +55,8 @@ func (hr *HandlerRunner) Run(containerID kubecontainer.ContainerID, pod *api.Pod
switch {
case handler.Exec != nil:
var msg string
output, err := hr.commandRunner.RunInContainer(containerID, handler.Exec.Command)
// TODO(timstclair): Pass a proper timeout value.
output, err := hr.commandRunner.RunInContainer(containerID, handler.Exec.Command, 0)
if err != nil {
msg := fmt.Sprintf("Exec lifecycle hook (%v) for Container %q in Pod %q failed - error: %v, message: %q", handler.Exec.Command, container.Name, format.Pod(pod), err, string(output))
glog.V(1).Infof(msg)

View File

@ -23,6 +23,7 @@ import (
"reflect"
"strings"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -79,7 +80,7 @@ type fakeContainerCommandRunner struct {
ID kubecontainer.ContainerID
}
func (f *fakeContainerCommandRunner) RunInContainer(id kubecontainer.ContainerID, cmd []string) ([]byte, error) {
func (f *fakeContainerCommandRunner) RunInContainer(id kubecontainer.ContainerID, cmd []string, timeout time.Duration) ([]byte, error) {
f.Cmd = cmd
f.ID = id
return nil, nil

View File

@ -143,7 +143,7 @@ func (pb *prober) runProbe(p *api.Probe, pod *api.Pod, status api.PodStatus, con
timeout := time.Duration(p.TimeoutSeconds) * time.Second
if p.Exec != nil {
glog.V(4).Infof("Exec-Probe Pod: %v, Container: %v, Command: %v", pod, container, p.Exec.Command)
return pb.exec.Probe(pb.newExecInContainer(container, containerID, p.Exec.Command))
return pb.exec.Probe(pb.newExecInContainer(container, containerID, p.Exec.Command, timeout))
}
if p.HTTPGet != nil {
scheme := strings.ToLower(string(p.HTTPGet.Scheme))
@ -226,9 +226,9 @@ type execInContainer struct {
run func() ([]byte, error)
}
func (p *prober) newExecInContainer(container api.Container, containerID kubecontainer.ContainerID, cmd []string) exec.Cmd {
func (pb *prober) newExecInContainer(container api.Container, containerID kubecontainer.ContainerID, cmd []string, timeout time.Duration) exec.Cmd {
return execInContainer{func() ([]byte, error) {
return p.runner.RunInContainer(containerID, cmd)
return pb.runner.RunInContainer(containerID, cmd, timeout)
}}
}

View File

@ -305,7 +305,7 @@ func TestNewExecInContainer(t *testing.T) {
container := api.Container{}
containerID := kubecontainer.ContainerID{Type: "docker", ID: "containerID"}
cmd := []string{"/foo", "bar"}
exec := prober.newExecInContainer(container, containerID, cmd)
exec := prober.newExecInContainer(container, containerID, cmd, 0)
actualOutput, err := exec.CombinedOutput()
if e, a := containerID, runner.ContainerID; e != a {

View File

@ -2038,7 +2038,7 @@ func (r *Runtime) AttachContainer(containerID kubecontainer.ContainerID, stdin i
// Note: In rkt, the container ID is in the form of "UUID:appName", where UUID is
// the rkt UUID, and appName is the container name.
// TODO(yifan): If the rkt is using lkvm as the stage1 image, then this function will fail.
func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
glog.V(4).Infof("Rkt execing in container.")
id, err := parseContainerID(containerID)

View File

@ -40,7 +40,7 @@ const (
type Executor interface {
// ExecInContainer executes a command in a container in the pod, copying data
// between in/out/err and the container's stdin/stdout/stderr.
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error
}
// ServeExec handles requests to execute a command in a container. After
@ -56,7 +56,7 @@ func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podN
cmd := req.URL.Query()[api.ExecCommandParamm]
err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan)
err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0)
if err != nil {
if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() {
rc := exitErr.ExitStatus()

View File

@ -165,7 +165,7 @@ type HostInterface interface {
GetRunningPods() ([]*api.Pod, error)
GetPodByName(namespace, name string) (*api.Pod, bool)
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error
AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error
GetKubeletContainerLogs(podFullName, containerName string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error
ServeLogs(w http.ResponseWriter, req *http.Request)

View File

@ -125,7 +125,7 @@ func (fk *fakeKubelet) RunInContainer(podFullName string, uid types.UID, contain
return fk.runFunc(podFullName, uid, containerName, cmd)
}
func (fk *fakeKubelet) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
func (fk *fakeKubelet) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
return fk.execFunc(name, uid, container, cmd, in, out, err, tty)
}

View File

@ -299,7 +299,7 @@ var _ remotecommand.Executor = &criAdapter{}
var _ remotecommand.Attacher = &criAdapter{}
var _ portforward.PortForwarder = &criAdapter{}
func (a *criAdapter) ExecInContainer(podName string, podUID types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
func (a *criAdapter) ExecInContainer(podName string, podUID types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
return a.Exec(container, cmd, in, out, err, tty, resize)
}

View File

@ -233,6 +233,35 @@ var _ = framework.KubeDescribe("Probing container", func() {
}, 0, defaultObservationTimeout)
})
It("should be restarted with a docker exec liveness probe with timeout [Conformance]", func() {
// TODO: enable this test once the default exec handler supports timeout.
Skip("The default exec handler, dockertools.NativeExecHandler, does not support timeouts due to a limitation in the Docker Remote API")
runLivenessTest(f, &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "liveness-exec",
Labels: map[string]string{"test": "liveness"},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "liveness",
Image: "gcr.io/google_containers/busybox:1.24",
Command: []string{"/bin/sh", "-c", "sleep 600"},
LivenessProbe: &api.Probe{
Handler: api.Handler{
Exec: &api.ExecAction{
Command: []string{"/bin/sh", "-c", "sleep 10"},
},
},
InitialDelaySeconds: 15,
TimeoutSeconds: 1,
FailureThreshold: 1,
},
},
},
},
}, 1, defaultObservationTimeout)
})
})
func getContainerStartedTime(p *api.Pod, containerName string) (time.Time, error) {

View File

@ -327,6 +327,7 @@ PrivilegedPod should test privileged pod,vishh,0
Probing container should *not* be restarted with a /healthz http liveness probe,Random-Liu,0
"Probing container should *not* be restarted with a exec ""cat /tmp/health"" liveness probe",Random-Liu,0
Probing container should be restarted with a /healthz http liveness probe,Random-Liu,0
Probing container should be restarted with a docker exec liveness probe with timeout,timstclair,0
"Probing container should be restarted with a exec ""cat /tmp/health"" liveness probe",Random-Liu,0
Probing container should have monotonically increasing restart count,Random-Liu,0
Probing container with readiness probe should not be ready before initial delay and never restart,Random-Liu,0

1 name owner auto-assigned
327 Probing container should *not* be restarted with a /healthz http liveness probe Random-Liu 0
328 Probing container should *not* be restarted with a exec "cat /tmp/health" liveness probe Random-Liu 0
329 Probing container should be restarted with a /healthz http liveness probe Random-Liu 0
330 Probing container should be restarted with a docker exec liveness probe with timeout timstclair 0
331 Probing container should be restarted with a exec "cat /tmp/health" liveness probe Random-Liu 0
332 Probing container should have monotonically increasing restart count Random-Liu 0
333 Probing container with readiness probe should not be ready before initial delay and never restart Random-Liu 0