mirror of https://github.com/k3s-io/k3s
Merge pull request #7177 from yifan-gu/runner
kubelet/dockertools: Remove dockerContainerCommandRunner.pull/6/head
commit
45575d95b3
|
@ -17,14 +17,11 @@ limitations under the License.
|
|||
package dockertools
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"hash/adler32"
|
||||
"io"
|
||||
"math/rand"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
|
@ -112,209 +109,6 @@ func newDockerPuller(client DockerInterface, qps float32, burst int) DockerPulle
|
|||
}
|
||||
}
|
||||
|
||||
type dockerContainerCommandRunner struct {
|
||||
client DockerInterface
|
||||
}
|
||||
|
||||
// The first version of docker that supports exec natively is 1.3.0 == API 1.15
|
||||
var dockerAPIVersionWithExec, _ = docker.NewAPIVersion("1.15")
|
||||
|
||||
// Returns the major and minor version numbers of docker server.
|
||||
// TODO(yifan): Remove this once the ContainerCommandRunner is implemented by dockerManager.
|
||||
func (d *dockerContainerCommandRunner) getDockerServerVersion() (docker.APIVersion, error) {
|
||||
env, err := d.client.Version()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get docker server version - %v", err)
|
||||
}
|
||||
|
||||
apiVersion := env.Get("ApiVersion")
|
||||
version, err := docker.NewAPIVersion(apiVersion)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse docker server version %q: %v", apiVersion, err)
|
||||
}
|
||||
|
||||
return version, nil
|
||||
}
|
||||
|
||||
func (d *dockerContainerCommandRunner) nativeExecSupportExists() (bool, error) {
|
||||
version, err := d.getDockerServerVersion()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return version.GreaterThanOrEqualTo(dockerAPIVersionWithExec), nil
|
||||
}
|
||||
|
||||
func (d *dockerContainerCommandRunner) getRunInContainerCommand(containerID string, cmd []string) (*exec.Cmd, error) {
|
||||
args := append([]string{"exec"}, cmd...)
|
||||
command := exec.Command("/usr/sbin/nsinit", args...)
|
||||
command.Dir = fmt.Sprintf("/var/lib/docker/execdriver/native/%s", containerID)
|
||||
return command, nil
|
||||
}
|
||||
|
||||
func (d *dockerContainerCommandRunner) runInContainerUsingNsinit(containerID string, cmd []string) ([]byte, error) {
|
||||
c, err := d.getRunInContainerCommand(containerID, cmd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c.CombinedOutput()
|
||||
}
|
||||
|
||||
// RunInContainer uses nsinit to run the command inside the container identified by containerID
|
||||
// TODO(yifan): Use strong type for containerID.
|
||||
func (d *dockerContainerCommandRunner) RunInContainer(containerID string, cmd []string) ([]byte, error) {
|
||||
// If native exec support does not exist in the local docker daemon use nsinit.
|
||||
useNativeExec, err := d.nativeExecSupportExists()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !useNativeExec {
|
||||
return d.runInContainerUsingNsinit(containerID, cmd)
|
||||
}
|
||||
createOpts := docker.CreateExecOptions{
|
||||
Container: containerID,
|
||||
Cmd: cmd,
|
||||
AttachStdin: false,
|
||||
AttachStdout: true,
|
||||
AttachStderr: true,
|
||||
Tty: false,
|
||||
}
|
||||
execObj, err := d.client.CreateExec(createOpts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to run in container - Exec setup failed - %v", err)
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
wrBuf := bufio.NewWriter(&buf)
|
||||
startOpts := docker.StartExecOptions{
|
||||
Detach: false,
|
||||
Tty: false,
|
||||
OutputStream: wrBuf,
|
||||
ErrorStream: wrBuf,
|
||||
RawTerminal: false,
|
||||
}
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
errChan <- d.client.StartExec(execObj.ID, startOpts)
|
||||
}()
|
||||
wrBuf.Flush()
|
||||
return buf.Bytes(), <-errChan
|
||||
}
|
||||
|
||||
// ExecInContainer uses nsenter to run the command inside the container identified by containerID.
|
||||
//
|
||||
// TODO:
|
||||
// - match cgroups of container
|
||||
// - should we support `docker exec`?
|
||||
// - should we support nsenter in a container, running with elevated privs and --pid=host?
|
||||
// - use strong type for containerId
|
||||
func (d *dockerContainerCommandRunner) ExecInContainer(containerId string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
|
||||
nsenter, err := exec.LookPath("nsenter")
|
||||
if err != nil {
|
||||
return fmt.Errorf("exec unavailable - unable to locate nsenter")
|
||||
}
|
||||
|
||||
container, err := d.client.InspectContainer(containerId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !container.State.Running {
|
||||
return fmt.Errorf("container not running (%s)", container)
|
||||
}
|
||||
|
||||
containerPid := container.State.Pid
|
||||
|
||||
// TODO what if the container doesn't have `env`???
|
||||
args := []string{"-t", fmt.Sprintf("%d", containerPid), "-m", "-i", "-u", "-n", "-p", "--", "env", "-i"}
|
||||
args = append(args, fmt.Sprintf("HOSTNAME=%s", container.Config.Hostname))
|
||||
args = append(args, container.Config.Env...)
|
||||
args = append(args, cmd...)
|
||||
command := exec.Command(nsenter, args...)
|
||||
if tty {
|
||||
p, err := StartPty(command)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer p.Close()
|
||||
|
||||
// make sure to close the stdout stream
|
||||
defer stdout.Close()
|
||||
|
||||
if stdin != nil {
|
||||
go io.Copy(p, stdin)
|
||||
}
|
||||
|
||||
if stdout != nil {
|
||||
go io.Copy(stdout, p)
|
||||
}
|
||||
|
||||
return command.Wait()
|
||||
} else {
|
||||
if stdin != nil {
|
||||
// Use an os.Pipe here as it returns true *os.File objects.
|
||||
// This way, if you run 'kubectl exec -p <pod> -i bash' (no tty) and type 'exit',
|
||||
// the call below to command.Run() can unblock because its Stdin is the read half
|
||||
// of the pipe.
|
||||
r, w, err := os.Pipe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go io.Copy(w, stdin)
|
||||
|
||||
command.Stdin = r
|
||||
}
|
||||
if stdout != nil {
|
||||
command.Stdout = stdout
|
||||
}
|
||||
if stderr != nil {
|
||||
command.Stderr = stderr
|
||||
}
|
||||
|
||||
return command.Run()
|
||||
}
|
||||
}
|
||||
|
||||
// PortForward executes socat in the pod's network namespace and copies
|
||||
// data between stream (representing the user's local connection on their
|
||||
// computer) and the specified port in the container.
|
||||
//
|
||||
// TODO:
|
||||
// - match cgroups of container
|
||||
// - should we support nsenter + socat on the host? (current impl)
|
||||
// - should we support nsenter + socat in a container, running with elevated privs and --pid=host?
|
||||
func (d *dockerContainerCommandRunner) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
|
||||
podInfraContainer := pod.FindContainerByName(PodInfraContainerName)
|
||||
if podInfraContainer == nil {
|
||||
return fmt.Errorf("cannot find pod infra container in pod %q", kubecontainer.BuildPodFullName(pod.Name, pod.Namespace))
|
||||
}
|
||||
container, err := d.client.InspectContainer(string(podInfraContainer.ID))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !container.State.Running {
|
||||
return fmt.Errorf("container not running (%s)", container)
|
||||
}
|
||||
|
||||
containerPid := container.State.Pid
|
||||
// TODO what if the host doesn't have it???
|
||||
_, lookupErr := exec.LookPath("socat")
|
||||
if lookupErr != nil {
|
||||
return fmt.Errorf("Unable to do port forwarding: socat not found.")
|
||||
}
|
||||
args := []string{"-t", fmt.Sprintf("%d", containerPid), "-n", "socat", "-", fmt.Sprintf("TCP4:localhost:%d", port)}
|
||||
// TODO use exec.LookPath
|
||||
command := exec.Command("nsenter", args...)
|
||||
command.Stdin = stream
|
||||
command.Stdout = stream
|
||||
return command.Run()
|
||||
}
|
||||
|
||||
// NewDockerContainerCommandRunner creates a ContainerCommandRunner which uses nsinit to run a command
|
||||
// inside a container.
|
||||
func NewDockerContainerCommandRunner(client DockerInterface) ContainerCommandRunner {
|
||||
return &dockerContainerCommandRunner{client: client}
|
||||
}
|
||||
|
||||
func parseImageName(image string) (string, string) {
|
||||
return parsers.ParseRepositoryTag(image)
|
||||
}
|
||||
|
|
|
@ -144,7 +144,7 @@ func TestVersion(t *testing.T) {
|
|||
|
||||
func TestExecSupportExists(t *testing.T) {
|
||||
fakeDocker := &FakeDockerClient{VersionInfo: docker.Env{"Version=1.3.0", "ApiVersion=1.15"}}
|
||||
runner := dockerContainerCommandRunner{fakeDocker}
|
||||
runner := &DockerManager{client: fakeDocker}
|
||||
useNativeExec, err := runner.nativeExecSupportExists()
|
||||
if err != nil {
|
||||
t.Errorf("got error while checking for exec support - %s", err)
|
||||
|
@ -156,7 +156,7 @@ func TestExecSupportExists(t *testing.T) {
|
|||
|
||||
func TestExecSupportNotExists(t *testing.T) {
|
||||
fakeDocker := &FakeDockerClient{VersionInfo: docker.Env{"Version=1.1.2", "ApiVersion=1.14"}}
|
||||
runner := dockerContainerCommandRunner{fakeDocker}
|
||||
runner := &DockerManager{client: fakeDocker}
|
||||
useNativeExec, _ := runner.nativeExecSupportExists()
|
||||
if useNativeExec {
|
||||
t.Errorf("invalid exec support check output.")
|
||||
|
@ -164,7 +164,7 @@ func TestExecSupportNotExists(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestDockerContainerCommand(t *testing.T) {
|
||||
runner := dockerContainerCommandRunner{}
|
||||
runner := &DockerManager{}
|
||||
containerID := "1234"
|
||||
command := []string{"ls"}
|
||||
cmd, _ := runner.getRunInContainerCommand(containerID, command)
|
||||
|
|
|
@ -17,11 +17,14 @@ limitations under the License.
|
|||
package dockertools
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -697,3 +700,183 @@ func (dm *DockerManager) Version() (kubecontainer.Version, error) {
|
|||
}
|
||||
return dockerVersion(version), nil
|
||||
}
|
||||
|
||||
// The first version of docker that supports exec natively is 1.3.0 == API 1.15
|
||||
var dockerAPIVersionWithExec = "1.15"
|
||||
|
||||
func (dm *DockerManager) nativeExecSupportExists() (bool, error) {
|
||||
version, err := dm.Version()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
result, err := version.Compare(dockerAPIVersionWithExec)
|
||||
if result >= 0 {
|
||||
return true, err
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
func (dm *DockerManager) getRunInContainerCommand(containerID string, cmd []string) (*exec.Cmd, error) {
|
||||
args := append([]string{"exec"}, cmd...)
|
||||
command := exec.Command("/usr/sbin/nsinit", args...)
|
||||
command.Dir = fmt.Sprintf("/var/lib/docker/execdriver/native/%s", containerID)
|
||||
return command, nil
|
||||
}
|
||||
|
||||
func (dm *DockerManager) runInContainerUsingNsinit(containerID string, cmd []string) ([]byte, error) {
|
||||
c, err := dm.getRunInContainerCommand(containerID, cmd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c.CombinedOutput()
|
||||
}
|
||||
|
||||
// RunInContainer uses nsinit to run the command inside the container identified by containerID
|
||||
// TODO(yifan): Use strong type for containerID.
|
||||
func (dm *DockerManager) RunInContainer(containerID string, cmd []string) ([]byte, error) {
|
||||
// If native exec support does not exist in the local docker daemon use nsinit.
|
||||
useNativeExec, err := dm.nativeExecSupportExists()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !useNativeExec {
|
||||
return dm.runInContainerUsingNsinit(containerID, cmd)
|
||||
}
|
||||
createOpts := docker.CreateExecOptions{
|
||||
Container: containerID,
|
||||
Cmd: cmd,
|
||||
AttachStdin: false,
|
||||
AttachStdout: true,
|
||||
AttachStderr: true,
|
||||
Tty: false,
|
||||
}
|
||||
execObj, err := dm.client.CreateExec(createOpts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to run in container - Exec setup failed - %v", err)
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
wrBuf := bufio.NewWriter(&buf)
|
||||
startOpts := docker.StartExecOptions{
|
||||
Detach: false,
|
||||
Tty: false,
|
||||
OutputStream: wrBuf,
|
||||
ErrorStream: wrBuf,
|
||||
RawTerminal: false,
|
||||
}
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
errChan <- dm.client.StartExec(execObj.ID, startOpts)
|
||||
}()
|
||||
wrBuf.Flush()
|
||||
return buf.Bytes(), <-errChan
|
||||
}
|
||||
|
||||
// ExecInContainer uses nsenter to run the command inside the container identified by containerID.
|
||||
//
|
||||
// TODO:
|
||||
// - match cgroups of container
|
||||
// - should we support `docker exec`?
|
||||
// - should we support nsenter in a container, running with elevated privs and --pid=host?
|
||||
// - use strong type for containerId
|
||||
func (dm *DockerManager) ExecInContainer(containerId string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
|
||||
nsenter, err := exec.LookPath("nsenter")
|
||||
if err != nil {
|
||||
return fmt.Errorf("exec unavailable - unable to locate nsenter")
|
||||
}
|
||||
|
||||
container, err := dm.client.InspectContainer(containerId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !container.State.Running {
|
||||
return fmt.Errorf("container not running (%s)", container)
|
||||
}
|
||||
|
||||
containerPid := container.State.Pid
|
||||
|
||||
// TODO what if the container doesn't have `env`???
|
||||
args := []string{"-t", fmt.Sprintf("%d", containerPid), "-m", "-i", "-u", "-n", "-p", "--", "env", "-i"}
|
||||
args = append(args, fmt.Sprintf("HOSTNAME=%s", container.Config.Hostname))
|
||||
args = append(args, container.Config.Env...)
|
||||
args = append(args, cmd...)
|
||||
command := exec.Command(nsenter, args...)
|
||||
if tty {
|
||||
p, err := StartPty(command)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer p.Close()
|
||||
|
||||
// make sure to close the stdout stream
|
||||
defer stdout.Close()
|
||||
|
||||
if stdin != nil {
|
||||
go io.Copy(p, stdin)
|
||||
}
|
||||
|
||||
if stdout != nil {
|
||||
go io.Copy(stdout, p)
|
||||
}
|
||||
|
||||
return command.Wait()
|
||||
} else {
|
||||
if stdin != nil {
|
||||
// Use an os.Pipe here as it returns true *os.File objects.
|
||||
// This way, if you run 'kubectl exec -p <pod> -i bash' (no tty) and type 'exit',
|
||||
// the call below to command.Run() can unblock because its Stdin is the read half
|
||||
// of the pipe.
|
||||
r, w, err := os.Pipe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go io.Copy(w, stdin)
|
||||
|
||||
command.Stdin = r
|
||||
}
|
||||
if stdout != nil {
|
||||
command.Stdout = stdout
|
||||
}
|
||||
if stderr != nil {
|
||||
command.Stderr = stderr
|
||||
}
|
||||
|
||||
return command.Run()
|
||||
}
|
||||
}
|
||||
|
||||
// PortForward executes socat in the pod's network namespace and copies
|
||||
// data between stream (representing the user's local connection on their
|
||||
// computer) and the specified port in the container.
|
||||
//
|
||||
// TODO:
|
||||
// - match cgroups of container
|
||||
// - should we support nsenter + socat on the host? (current impl)
|
||||
// - should we support nsenter + socat in a container, running with elevated privs and --pid=host?
|
||||
func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
|
||||
podInfraContainer := pod.FindContainerByName(PodInfraContainerName)
|
||||
if podInfraContainer == nil {
|
||||
return fmt.Errorf("cannot find pod infra container in pod %q", kubecontainer.BuildPodFullName(pod.Name, pod.Namespace))
|
||||
}
|
||||
container, err := dm.client.InspectContainer(string(podInfraContainer.ID))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !container.State.Running {
|
||||
return fmt.Errorf("container not running (%s)", container)
|
||||
}
|
||||
|
||||
containerPid := container.State.Pid
|
||||
// TODO what if the host doesn't have it???
|
||||
_, lookupErr := exec.LookPath("socat")
|
||||
if lookupErr != nil {
|
||||
return fmt.Errorf("Unable to do port forwarding: socat not found.")
|
||||
}
|
||||
args := []string{"-t", fmt.Sprintf("%d", containerPid), "-n", "socat", "-", fmt.Sprintf("TCP4:localhost:%d", port)}
|
||||
// TODO use exec.LookPath
|
||||
command := exec.Command("nsenter", args...)
|
||||
command.Stdin = stream
|
||||
command.Stdout = stream
|
||||
return command.Run()
|
||||
}
|
||||
|
|
|
@ -210,7 +210,7 @@ func NewMainKubelet(
|
|||
resyncInterval: resyncInterval,
|
||||
containerRefManager: kubecontainer.NewRefManager(),
|
||||
readinessManager: kubecontainer.NewReadinessManager(),
|
||||
runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
|
||||
runner: containerManager,
|
||||
httpClient: &http.Client{},
|
||||
sourcesReady: sourcesReady,
|
||||
clusterDomain: clusterDomain,
|
||||
|
|
Loading…
Reference in New Issue