Merge pull request #6378 from yujuhong/docker_manager

RFC: Kubelet: a step towards to better encapsulation of docker functions
pull/6/head
Dawn Chen 2015-04-02 14:22:26 -07:00
commit 67c1678e7e
8 changed files with 546 additions and 524 deletions

View File

@ -19,11 +19,9 @@ package dockertools
import (
"bufio"
"bytes"
"errors"
"fmt"
"hash/adler32"
"io"
"io/ioutil"
"math/rand"
"os"
"os/exec"
@ -447,286 +445,6 @@ func (c DockerContainers) FindContainersByPod(podUID types.UID, podFullName stri
return containers
}
// GetKubeletDockerContainers takes client and boolean whether to list all container or just the running ones.
// Returns a map of docker containers that we manage. The map key is the docker container ID
func GetKubeletDockerContainers(client DockerInterface, allContainers bool) (DockerContainers, error) {
result := make(DockerContainers)
containers, err := client.ListContainers(docker.ListContainersOptions{All: allContainers})
if err != nil {
return nil, err
}
for i := range containers {
container := &containers[i]
if len(container.Names) == 0 {
continue
}
// Skip containers that we didn't create to allow users to manually
// spin up their own containers if they want.
// TODO(dchen1107): Remove the old separator "--" by end of Oct
if !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"_") &&
!strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"--") {
glog.V(3).Infof("Docker Container: %s is not managed by kubelet.", container.Names[0])
continue
}
result[DockerID(container.ID)] = container
}
return result, nil
}
// GetRecentDockerContainersWithNameAndUUID returns a list of dead docker containers which matches the name
// and uid given.
func GetRecentDockerContainersWithNameAndUUID(client DockerInterface, podFullName string, uid types.UID, containerName string) ([]*docker.Container, error) {
var result []*docker.Container
containers, err := client.ListContainers(docker.ListContainersOptions{All: true})
if err != nil {
return nil, err
}
for _, dockerContainer := range containers {
if len(dockerContainer.Names) == 0 {
continue
}
dockerName, _, err := ParseDockerName(dockerContainer.Names[0])
if err != nil {
continue
}
if dockerName.PodFullName != podFullName {
continue
}
if uid != "" && dockerName.PodUID != uid {
continue
}
if dockerName.ContainerName != containerName {
continue
}
inspectResult, _ := client.InspectContainer(dockerContainer.ID)
if inspectResult != nil && !inspectResult.State.Running && !inspectResult.State.Paused {
result = append(result, inspectResult)
}
}
return result, nil
}
// GetKubeletDockerContainerLogs returns logs of specific container
// By default the function will return snapshot of the container log
// Log streaming is possible if 'follow' param is set to true
// Log tailing is possible when number of tailed lines are set and only if 'follow' is false
// TODO: Make 'RawTerminal' option flagable.
func GetKubeletDockerContainerLogs(client DockerInterface, containerID, tail string, follow bool, stdout, stderr io.Writer) (err error) {
opts := docker.LogsOptions{
Container: containerID,
Stdout: true,
Stderr: true,
OutputStream: stdout,
ErrorStream: stderr,
Timestamps: true,
RawTerminal: false,
Follow: follow,
}
if !follow {
opts.Tail = tail
}
err = client.Logs(opts)
return
}
var (
// ErrNoContainersInPod is returned when there are no containers for a given pod
ErrNoContainersInPod = errors.New("no containers exist for this pod")
// ErrNoPodInfraContainerInPod is returned when there is no pod infra container for a given pod
ErrNoPodInfraContainerInPod = errors.New("No pod infra container exists for this pod")
// ErrContainerCannotRun is returned when a container is created, but cannot run properly
ErrContainerCannotRun = errors.New("Container cannot run")
)
// Internal information kept for containers from inspection
type containerStatusResult struct {
status api.ContainerStatus
ip string
err error
}
func inspectContainer(client DockerInterface, dockerID, containerName, tPath string) *containerStatusResult {
result := containerStatusResult{api.ContainerStatus{}, "", nil}
inspectResult, err := client.InspectContainer(dockerID)
if err != nil {
result.err = err
return &result
}
if inspectResult == nil {
// Why did we not get an error?
return &result
}
glog.V(3).Infof("Container inspect result: %+v", *inspectResult)
result.status = api.ContainerStatus{
Name: containerName,
Image: inspectResult.Config.Image,
ImageID: DockerPrefix + inspectResult.Image,
ContainerID: DockerPrefix + dockerID,
}
waiting := true
if inspectResult.State.Running {
result.status.State.Running = &api.ContainerStateRunning{
StartedAt: util.NewTime(inspectResult.State.StartedAt),
}
if containerName == PodInfraContainerName && inspectResult.NetworkSettings != nil {
result.ip = inspectResult.NetworkSettings.IPAddress
}
waiting = false
} else if !inspectResult.State.FinishedAt.IsZero() {
reason := ""
// Note: An application might handle OOMKilled gracefully.
// In that case, the container is oom killed, but the exit
// code could be 0.
if inspectResult.State.OOMKilled {
reason = "OOM Killed"
} else {
reason = inspectResult.State.Error
}
result.status.State.Termination = &api.ContainerStateTerminated{
ExitCode: inspectResult.State.ExitCode,
Reason: reason,
StartedAt: util.NewTime(inspectResult.State.StartedAt),
FinishedAt: util.NewTime(inspectResult.State.FinishedAt),
}
if tPath != "" {
path, found := inspectResult.Volumes[tPath]
if found {
data, err := ioutil.ReadFile(path)
if err != nil {
glog.Errorf("Error on reading termination-log %s: %v", path, err)
} else {
result.status.State.Termination.Message = string(data)
}
}
}
waiting = false
}
if waiting {
// TODO(dchen1107): Separate issue docker/docker#8294 was filed
// TODO(dchen1107): Need to figure out why we are still waiting
// Check any issue to run container
result.status.State.Waiting = &api.ContainerStateWaiting{
Reason: ErrContainerCannotRun.Error(),
}
}
return &result
}
// GetDockerPodStatus returns docker related status for all containers in the pod/manifest and
// infrastructure container
func GetDockerPodStatus(client DockerInterface, manifest api.PodSpec, podFullName string, uid types.UID) (*api.PodStatus, error) {
var podStatus api.PodStatus
statuses := make(map[string]api.ContainerStatus)
expectedContainers := make(map[string]api.Container)
for _, container := range manifest.Containers {
expectedContainers[container.Name] = container
}
expectedContainers[PodInfraContainerName] = api.Container{}
containers, err := client.ListContainers(docker.ListContainersOptions{All: true})
if err != nil {
return nil, err
}
for _, value := range containers {
if len(value.Names) == 0 {
continue
}
dockerName, _, err := ParseDockerName(value.Names[0])
if err != nil {
continue
}
if dockerName.PodFullName != podFullName {
continue
}
if uid != "" && dockerName.PodUID != uid {
continue
}
dockerContainerName := dockerName.ContainerName
c, found := expectedContainers[dockerContainerName]
terminationMessagePath := ""
if !found {
// TODO(dchen1107): should figure out why not continue here
// continue
} else {
terminationMessagePath = c.TerminationMessagePath
}
// We assume docker return us a list of containers in time order
if containerStatus, found := statuses[dockerContainerName]; found {
containerStatus.RestartCount += 1
statuses[dockerContainerName] = containerStatus
continue
}
result := inspectContainer(client, value.ID, dockerContainerName, terminationMessagePath)
if result.err != nil {
return nil, err
}
// Add user container information
if dockerContainerName == PodInfraContainerName &&
result.status.State.Running != nil {
// Found network container
podStatus.PodIP = result.ip
} else {
statuses[dockerContainerName] = result.status
}
}
if len(statuses) == 0 && podStatus.PodIP == "" {
return nil, ErrNoContainersInPod
}
// Not all containers expected are created, check if there are
// image related issues
if len(statuses) < len(manifest.Containers) {
var containerStatus api.ContainerStatus
for _, container := range manifest.Containers {
if _, found := statuses[container.Name]; found {
continue
}
image := container.Image
// Check image is ready on the node or not
// TODO(dchen1107): docker/docker/issues/8365 to figure out if the image exists
_, err := client.InspectImage(image)
if err == nil {
containerStatus.State.Waiting = &api.ContainerStateWaiting{
Reason: fmt.Sprintf("Image: %s is ready, container is creating", image),
}
} else if err == docker.ErrNoSuchImage {
containerStatus.State.Waiting = &api.ContainerStateWaiting{
Reason: fmt.Sprintf("Image: %s is not ready on the node", image),
}
} else {
containerStatus.State.Waiting = &api.ContainerStateWaiting{
Reason: err.Error(),
}
}
statuses[container.Name] = containerStatus
}
}
podStatus.ContainerStatuses = make([]api.ContainerStatus, 0)
for _, status := range statuses {
podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, status)
}
return &podStatus, nil
}
const containerNamePrefix = "k8s"
func HashContainer(container *api.Container) uint64 {
@ -781,23 +499,6 @@ func ParseDockerName(name string) (dockerName *KubeletContainerName, hash uint64
return &KubeletContainerName{podFullName, podUID, containerName}, hash, nil
}
func GetRunningContainers(client DockerInterface, ids []string) ([]*docker.Container, error) {
result := []*docker.Container{}
if client == nil {
return nil, fmt.Errorf("unexpected nil docker client.")
}
for ix := range ids {
status, err := client.InspectContainer(ids[ix])
if err != nil {
return nil, err
}
if status != nil && status.State.Running {
result = append(result, status)
}
}
return result, nil
}
// Get a docker endpoint, either from the string passed in, or $DOCKER_HOST environment variables
func getDockerEndpoint(dockerEndpoint string) string {
var endpoint string
@ -833,6 +534,47 @@ type ContainerCommandRunner interface {
PortForward(podInfraContainerID string, port uint16, stream io.ReadWriteCloser) error
}
func milliCPUToShares(milliCPU int64) int64 {
if milliCPU == 0 {
// zero milliCPU means unset. Use kernel default.
return 0
}
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
shares := (milliCPU * sharesPerCPU) / milliCPUToCPU
if shares < minShares {
return minShares
}
return shares
}
// GetKubeletDockerContainers lists all container or just the running ones.
// Returns a map of docker containers that we manage, keyed by container ID.
// TODO: Move this function with dockerCache to DockerManager.
func GetKubeletDockerContainers(client DockerInterface, allContainers bool) (DockerContainers, error) {
result := make(DockerContainers)
containers, err := client.ListContainers(docker.ListContainersOptions{All: allContainers})
if err != nil {
return nil, err
}
for i := range containers {
container := &containers[i]
if len(container.Names) == 0 {
continue
}
// Skip containers that we didn't create to allow users to manually
// spin up their own containers if they want.
// TODO(dchen1107): Remove the old separator "--" by end of Oct
if !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"_") &&
!strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"--") {
glog.V(3).Infof("Docker Container: %s is not managed by kubelet.", container.Names[0])
continue
}
result[DockerID(container.ID)] = container
}
return result, nil
}
// TODO: Move this function with dockerCache to DockerManager.
func GetPods(client DockerInterface, all bool) ([]*kubecontainer.Pod, error) {
pods := make(map[types.UID]*kubecontainer.Pod)
var result []*kubecontainer.Pod
@ -881,64 +623,3 @@ func GetPods(client DockerInterface, all bool) ([]*kubecontainer.Pod, error) {
}
return result, nil
}
func milliCPUToShares(milliCPU int64) int64 {
if milliCPU == 0 {
// zero milliCPU means unset. Use kernel default.
return 0
}
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
shares := (milliCPU * sharesPerCPU) / milliCPUToCPU
if shares < minShares {
return minShares
}
return shares
}
func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) {
exposedPorts := map[docker.Port]struct{}{}
portBindings := map[docker.Port][]docker.PortBinding{}
for _, port := range container.Ports {
exteriorPort := port.HostPort
if exteriorPort == 0 {
// No need to do port binding when HostPort is not specified
continue
}
interiorPort := port.ContainerPort
// Some of this port stuff is under-documented voodoo.
// See http://stackoverflow.com/questions/20428302/binding-a-port-to-a-host-interface-using-the-rest-api
var protocol string
switch strings.ToUpper(string(port.Protocol)) {
case "UDP":
protocol = "/udp"
case "TCP":
protocol = "/tcp"
default:
glog.Warningf("Unknown protocol %q: defaulting to TCP", port.Protocol)
protocol = "/tcp"
}
dockerPort := docker.Port(strconv.Itoa(interiorPort) + protocol)
exposedPorts[dockerPort] = struct{}{}
portBindings[dockerPort] = []docker.PortBinding{
{
HostPort: strconv.Itoa(exteriorPort),
HostIP: port.HostIP,
},
}
}
return exposedPorts, portBindings
}
func makeCapabilites(capAdd []api.CapabilityType, capDrop []api.CapabilityType) ([]string, []string) {
var (
addCaps []string
dropCaps []string
)
for _, cap := range capAdd {
addCaps = append(addCaps, string(cap))
}
for _, cap := range capDrop {
dropCaps = append(dropCaps, string(cap))
}
return addCaps, dropCaps
}

View File

@ -23,6 +23,7 @@ import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -394,6 +395,8 @@ func TestIsImagePresent(t *testing.T) {
func TestGetRunningContainers(t *testing.T) {
fakeDocker := &FakeDockerClient{}
fakeRecorder := &record.FakeRecorder{}
containerManager := NewDockerManager(fakeDocker, fakeRecorder)
tests := []struct {
containers map[string]*docker.Container
inputIDs []string
@ -476,7 +479,7 @@ func TestGetRunningContainers(t *testing.T) {
for _, test := range tests {
fakeDocker.ContainerMap = test.containers
fakeDocker.Err = test.err
if results, err := GetRunningContainers(fakeDocker, test.inputIDs); err == nil {
if results, err := containerManager.GetRunningContainers(test.inputIDs); err == nil {
resultIDs := []string{}
for _, result := range results {
resultIDs = append(resultIDs, result.ID)

View File

@ -0,0 +1,489 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dockertools
import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"strconv"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
)
// Implements kubecontainer.ContainerRunner.
// TODO: Eventually DockerManager should implement kubecontainer.Runtime
// interface, and it should also add a cache to replace dockerCache.
type DockerManager struct {
client DockerInterface
recorder record.EventRecorder
}
// Ensures DockerManager implements ConatinerRunner.
var _ kubecontainer.ContainerRunner = new(DockerManager)
func NewDockerManager(client DockerInterface, recorder record.EventRecorder) *DockerManager {
return &DockerManager{client: client, recorder: recorder}
}
// GetRecentDockerContainersWithNameAndUUID returns a list of dead docker containers which matches the name
// and uid given.
func (self *DockerManager) GetRecentDockerContainersWithNameAndUUID(podFullName string, uid types.UID,
containerName string) ([]*docker.Container, error) {
var result []*docker.Container
containers, err := self.client.ListContainers(docker.ListContainersOptions{All: true})
if err != nil {
return nil, err
}
for _, dockerContainer := range containers {
if len(dockerContainer.Names) == 0 {
continue
}
dockerName, _, err := ParseDockerName(dockerContainer.Names[0])
if err != nil {
continue
}
if dockerName.PodFullName != podFullName {
continue
}
if uid != "" && dockerName.PodUID != uid {
continue
}
if dockerName.ContainerName != containerName {
continue
}
inspectResult, _ := self.client.InspectContainer(dockerContainer.ID)
if inspectResult != nil && !inspectResult.State.Running && !inspectResult.State.Paused {
result = append(result, inspectResult)
}
}
return result, nil
}
// GetKubeletDockerContainerLogs returns logs of a specific container. By
// default, it returns a snapshot of the container log. Set |follow| to true to
// stream the log. Set |follow| to false and specify the number of lines (e.g.
// "100" or "all") to tail the log.
// TODO: Make 'RawTerminal' option flagable.
func (self *DockerManager) GetKubeletDockerContainerLogs(containerID, tail string, follow bool, stdout, stderr io.Writer) (err error) {
opts := docker.LogsOptions{
Container: containerID,
Stdout: true,
Stderr: true,
OutputStream: stdout,
ErrorStream: stderr,
Timestamps: true,
RawTerminal: false,
Follow: follow,
}
if !follow {
opts.Tail = tail
}
err = self.client.Logs(opts)
return
}
var (
// ErrNoContainersInPod is returned when there are no containers for a given pod
ErrNoContainersInPod = errors.New("no containers exist for this pod")
// ErrNoPodInfraContainerInPod is returned when there is no pod infra container for a given pod
ErrNoPodInfraContainerInPod = errors.New("No pod infra container exists for this pod")
// ErrContainerCannotRun is returned when a container is created, but cannot run properly
ErrContainerCannotRun = errors.New("Container cannot run")
)
// Internal information kept for containers from inspection
type containerStatusResult struct {
status api.ContainerStatus
ip string
err error
}
func (self *DockerManager) inspectContainer(dockerID, containerName, tPath string) *containerStatusResult {
result := containerStatusResult{api.ContainerStatus{}, "", nil}
inspectResult, err := self.client.InspectContainer(dockerID)
if err != nil {
result.err = err
return &result
}
if inspectResult == nil {
// Why did we not get an error?
return &result
}
glog.V(3).Infof("Container inspect result: %+v", *inspectResult)
result.status = api.ContainerStatus{
Name: containerName,
Image: inspectResult.Config.Image,
ImageID: DockerPrefix + inspectResult.Image,
ContainerID: DockerPrefix + dockerID,
}
waiting := true
if inspectResult.State.Running {
result.status.State.Running = &api.ContainerStateRunning{
StartedAt: util.NewTime(inspectResult.State.StartedAt),
}
if containerName == PodInfraContainerName && inspectResult.NetworkSettings != nil {
result.ip = inspectResult.NetworkSettings.IPAddress
}
waiting = false
} else if !inspectResult.State.FinishedAt.IsZero() {
reason := ""
// Note: An application might handle OOMKilled gracefully.
// In that case, the container is oom killed, but the exit
// code could be 0.
if inspectResult.State.OOMKilled {
reason = "OOM Killed"
} else {
reason = inspectResult.State.Error
}
result.status.State.Termination = &api.ContainerStateTerminated{
ExitCode: inspectResult.State.ExitCode,
Reason: reason,
StartedAt: util.NewTime(inspectResult.State.StartedAt),
FinishedAt: util.NewTime(inspectResult.State.FinishedAt),
}
if tPath != "" {
path, found := inspectResult.Volumes[tPath]
if found {
data, err := ioutil.ReadFile(path)
if err != nil {
glog.Errorf("Error on reading termination-log %s: %v", path, err)
} else {
result.status.State.Termination.Message = string(data)
}
}
}
waiting = false
}
if waiting {
// TODO(dchen1107): Separate issue docker/docker#8294 was filed
// TODO(dchen1107): Need to figure out why we are still waiting
// Check any issue to run container
result.status.State.Waiting = &api.ContainerStateWaiting{
Reason: ErrContainerCannotRun.Error(),
}
}
return &result
}
// GetPodStatus returns docker related status for all containers in the pod as
// well as the infrastructure container.
func (self *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID
manifest := pod.Spec
var podStatus api.PodStatus
statuses := make(map[string]api.ContainerStatus)
expectedContainers := make(map[string]api.Container)
for _, container := range manifest.Containers {
expectedContainers[container.Name] = container
}
expectedContainers[PodInfraContainerName] = api.Container{}
containers, err := self.client.ListContainers(docker.ListContainersOptions{All: true})
if err != nil {
return nil, err
}
for _, value := range containers {
if len(value.Names) == 0 {
continue
}
dockerName, _, err := ParseDockerName(value.Names[0])
if err != nil {
continue
}
if dockerName.PodFullName != podFullName {
continue
}
if uid != "" && dockerName.PodUID != uid {
continue
}
dockerContainerName := dockerName.ContainerName
c, found := expectedContainers[dockerContainerName]
terminationMessagePath := ""
if !found {
// TODO(dchen1107): should figure out why not continue here
// continue
} else {
terminationMessagePath = c.TerminationMessagePath
}
// We assume docker return us a list of containers in time order
if containerStatus, found := statuses[dockerContainerName]; found {
containerStatus.RestartCount += 1
statuses[dockerContainerName] = containerStatus
continue
}
result := self.inspectContainer(value.ID, dockerContainerName, terminationMessagePath)
if result.err != nil {
return nil, err
}
// Add user container information
if dockerContainerName == PodInfraContainerName &&
result.status.State.Running != nil {
// Found network container
podStatus.PodIP = result.ip
} else {
statuses[dockerContainerName] = result.status
}
}
if len(statuses) == 0 && podStatus.PodIP == "" {
return nil, ErrNoContainersInPod
}
// Not all containers expected are created, check if there are
// image related issues
if len(statuses) < len(manifest.Containers) {
var containerStatus api.ContainerStatus
for _, container := range manifest.Containers {
if _, found := statuses[container.Name]; found {
continue
}
image := container.Image
// Check image is ready on the node or not
// TODO(dchen1107): docker/docker/issues/8365 to figure out if the image exists
_, err := self.client.InspectImage(image)
if err == nil {
containerStatus.State.Waiting = &api.ContainerStateWaiting{
Reason: fmt.Sprintf("Image: %s is ready, container is creating", image),
}
} else if err == docker.ErrNoSuchImage {
containerStatus.State.Waiting = &api.ContainerStateWaiting{
Reason: fmt.Sprintf("Image: %s is not ready on the node", image),
}
} else {
containerStatus.State.Waiting = &api.ContainerStateWaiting{
Reason: "",
}
}
statuses[container.Name] = containerStatus
}
}
podStatus.ContainerStatuses = make([]api.ContainerStatus, 0)
for _, status := range statuses {
podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, status)
}
return &podStatus, nil
}
func (self *DockerManager) GetRunningContainers(ids []string) ([]*docker.Container, error) {
result := []*docker.Container{}
if self.client == nil {
return nil, fmt.Errorf("unexpected nil docker client.")
}
for ix := range ids {
status, err := self.client.InspectContainer(ids[ix])
if err != nil {
return nil, err
}
if status != nil && status.State.Running {
result = append(result, status)
}
}
return result, nil
}
func (self *DockerManager) RunContainer(pod *api.Pod, container *api.Container, opts *kubecontainer.RunContainerOptions) (string, error) {
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
dockerName := KubeletContainerName{
PodFullName: kubecontainer.GetPodFullName(pod),
PodUID: pod.UID,
ContainerName: container.Name,
}
exposedPorts, portBindings := makePortsAndBindings(container)
// TODO(vmarmol): Handle better.
// Cap hostname at 63 chars (specification is 64bytes which is 63 chars and the null terminating char).
const hostnameMaxLen = 63
containerHostname := pod.Name
if len(containerHostname) > hostnameMaxLen {
containerHostname = containerHostname[:hostnameMaxLen]
}
dockerOpts := docker.CreateContainerOptions{
Name: BuildDockerName(dockerName, container),
Config: &docker.Config{
Env: opts.Envs,
ExposedPorts: exposedPorts,
Hostname: containerHostname,
Image: container.Image,
Memory: container.Resources.Limits.Memory().Value(),
CPUShares: milliCPUToShares(container.Resources.Limits.Cpu().MilliValue()),
WorkingDir: container.WorkingDir,
},
}
setEntrypointAndCommand(container, &dockerOpts)
glog.V(3).Infof("Container %v/%v/%v: setting entrypoint \"%v\" and command \"%v\"", pod.Namespace, pod.Name, container.Name, dockerOpts.Config.Entrypoint, dockerOpts.Config.Cmd)
dockerContainer, err := self.client.CreateContainer(dockerOpts)
if err != nil {
if ref != nil {
self.recorder.Eventf(ref, "failed", "Failed to create docker container with error: %v", err)
}
return "", err
}
if ref != nil {
self.recorder.Eventf(ref, "created", "Created with docker id %v", dockerContainer.ID)
}
// The reason we create and mount the log file in here (not in kubelet) is because
// the file's location depends on the ID of the container, and we need to create and
// mount the file before actually starting the container.
// TODO(yifan): Consider to pull this logic out since we might need to reuse it in
// other container runtime.
if opts.PodContainerDir != "" && len(container.TerminationMessagePath) != 0 {
containerLogPath := path.Join(opts.PodContainerDir, dockerContainer.ID)
fs, err := os.Create(containerLogPath)
if err != nil {
// TODO: Clean up the previouly created dir? return the error?
glog.Errorf("Error on creating termination-log file %q: %v", containerLogPath, err)
} else {
fs.Close() // Close immediately; we're just doing a `touch` here
b := fmt.Sprintf("%s:%s", containerLogPath, container.TerminationMessagePath)
opts.Binds = append(opts.Binds, b)
}
}
privileged := false
if capabilities.Get().AllowPrivileged {
privileged = container.Privileged
} else if container.Privileged {
return "", fmt.Errorf("container requested privileged mode, but it is disallowed globally.")
}
capAdd, capDrop := makeCapabilites(container.Capabilities.Add, container.Capabilities.Drop)
hc := &docker.HostConfig{
PortBindings: portBindings,
Binds: opts.Binds,
NetworkMode: opts.NetMode,
IpcMode: opts.IpcMode,
Privileged: privileged,
CapAdd: capAdd,
CapDrop: capDrop,
}
if len(opts.DNS) > 0 {
hc.DNS = opts.DNS
}
if len(opts.DNSSearch) > 0 {
hc.DNSSearch = opts.DNSSearch
}
if err = self.client.StartContainer(dockerContainer.ID, hc); err != nil {
if ref != nil {
self.recorder.Eventf(ref, "failed",
"Failed to start with docker id %v with error: %v", dockerContainer.ID, err)
}
return "", err
}
if ref != nil {
self.recorder.Eventf(ref, "started", "Started with docker id %v", dockerContainer.ID)
}
return dockerContainer.ID, nil
}
func setEntrypointAndCommand(container *api.Container, opts *docker.CreateContainerOptions) {
if len(container.Command) != 0 {
opts.Config.Entrypoint = container.Command
}
if len(container.Args) != 0 {
opts.Config.Cmd = container.Args
}
}
func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) {
exposedPorts := map[docker.Port]struct{}{}
portBindings := map[docker.Port][]docker.PortBinding{}
for _, port := range container.Ports {
exteriorPort := port.HostPort
if exteriorPort == 0 {
// No need to do port binding when HostPort is not specified
continue
}
interiorPort := port.ContainerPort
// Some of this port stuff is under-documented voodoo.
// See http://stackoverflow.com/questions/20428302/binding-a-port-to-a-host-interface-using-the-rest-api
var protocol string
switch strings.ToUpper(string(port.Protocol)) {
case "UDP":
protocol = "/udp"
case "TCP":
protocol = "/tcp"
default:
glog.Warningf("Unknown protocol %q: defaulting to TCP", port.Protocol)
protocol = "/tcp"
}
dockerPort := docker.Port(strconv.Itoa(interiorPort) + protocol)
exposedPorts[dockerPort] = struct{}{}
portBindings[dockerPort] = []docker.PortBinding{
{
HostPort: strconv.Itoa(exteriorPort),
HostIP: port.HostIP,
},
}
}
return exposedPorts, portBindings
}
func makeCapabilites(capAdd []api.CapabilityType, capDrop []api.CapabilityType) ([]string, []string) {
var (
addCaps []string
dropCaps []string
)
for _, cap := range capAdd {
addCaps = append(addCaps, string(cap))
}
for _, cap := range capDrop {
dropCaps = append(dropCaps, string(cap))
}
return addCaps, dropCaps
}

View File

@ -1,148 +0,0 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dockertools
import (
"fmt"
"os"
"path"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
)
type DockerContainerRunner struct {
Client DockerInterface
Recorder record.EventRecorder
}
func (r *DockerContainerRunner) RunContainer(pod *api.Pod, container *api.Container, opts *kubecontainer.RunContainerOptions) (string, error) {
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
dockerName := KubeletContainerName{
PodFullName: kubecontainer.GetPodFullName(pod),
PodUID: pod.UID,
ContainerName: container.Name,
}
exposedPorts, portBindings := makePortsAndBindings(container)
// TODO(vmarmol): Handle better.
// Cap hostname at 63 chars (specification is 64bytes which is 63 chars and the null terminating char).
const hostnameMaxLen = 63
containerHostname := pod.Name
if len(containerHostname) > hostnameMaxLen {
containerHostname = containerHostname[:hostnameMaxLen]
}
dockerOpts := docker.CreateContainerOptions{
Name: BuildDockerName(dockerName, container),
Config: &docker.Config{
Env: opts.Envs,
ExposedPorts: exposedPorts,
Hostname: containerHostname,
Image: container.Image,
Memory: container.Resources.Limits.Memory().Value(),
CPUShares: milliCPUToShares(container.Resources.Limits.Cpu().MilliValue()),
WorkingDir: container.WorkingDir,
},
}
setEntrypointAndCommand(container, &dockerOpts)
glog.V(3).Infof("Container %v/%v/%v: setting entrypoint \"%v\" and command \"%v\"", pod.Namespace, pod.Name, container.Name, dockerOpts.Config.Entrypoint, dockerOpts.Config.Cmd)
dockerContainer, err := r.Client.CreateContainer(dockerOpts)
if err != nil {
if ref != nil {
r.Recorder.Eventf(ref, "failed", "Failed to create docker container with error: %v", err)
}
return "", err
}
if ref != nil {
r.Recorder.Eventf(ref, "created", "Created with docker id %v", dockerContainer.ID)
}
// The reason we create and mount the log file in here (not in kubelet) is because
// the file's location depends on the ID of the container, and we need to create and
// mount the file before actually starting the container.
// TODO(yifan): Consider to pull this logic out since we might need to reuse it in
// other container runtime.
if opts.PodContainerDir != "" && len(container.TerminationMessagePath) != 0 {
containerLogPath := path.Join(opts.PodContainerDir, dockerContainer.ID)
fs, err := os.Create(containerLogPath)
if err != nil {
// TODO: Clean up the previouly created dir? return the error?
glog.Errorf("Error on creating termination-log file %q: %v", containerLogPath, err)
} else {
fs.Close() // Close immediately; we're just doing a `touch` here
b := fmt.Sprintf("%s:%s", containerLogPath, container.TerminationMessagePath)
opts.Binds = append(opts.Binds, b)
}
}
privileged := false
if capabilities.Get().AllowPrivileged {
privileged = container.Privileged
} else if container.Privileged {
return "", fmt.Errorf("container requested privileged mode, but it is disallowed globally.")
}
capAdd, capDrop := makeCapabilites(container.Capabilities.Add, container.Capabilities.Drop)
hc := &docker.HostConfig{
PortBindings: portBindings,
Binds: opts.Binds,
NetworkMode: opts.NetMode,
IpcMode: opts.IpcMode,
Privileged: privileged,
CapAdd: capAdd,
CapDrop: capDrop,
}
if len(opts.DNS) > 0 {
hc.DNS = opts.DNS
}
if len(opts.DNSSearch) > 0 {
hc.DNSSearch = opts.DNSSearch
}
if err = r.Client.StartContainer(dockerContainer.ID, hc); err != nil {
if ref != nil {
r.Recorder.Eventf(ref, "failed",
"Failed to start with docker id %v with error: %v", dockerContainer.ID, err)
}
return "", err
}
if ref != nil {
r.Recorder.Eventf(ref, "started", "Started with docker id %v", dockerContainer.ID)
}
return dockerContainer.ID, nil
}
func setEntrypointAndCommand(container *api.Container, opts *docker.CreateContainerOptions) {
if len(container.Command) != 0 {
opts.Config.Entrypoint = container.Command
}
if len(container.Args) != 0 {
opts.Config.Cmd = container.Args
}
}

View File

@ -212,10 +212,7 @@ func NewMainKubelet(
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
statusManager := newStatusManager(kubeClient)
containerRunner := &dockertools.DockerContainerRunner{
Client: dockerClient,
Recorder: recorder,
}
containerManager := dockertools.NewDockerManager(dockerClient, recorder)
klet := &Kubelet{
hostname: hostname,
@ -245,7 +242,7 @@ func NewMainKubelet(
statusManager: statusManager,
cloud: cloud,
nodeRef: nodeRef,
containerRunner: containerRunner,
containerManager: containerManager,
}
klet.podManager = newBasicPodManager(klet.kubeClient)
@ -362,14 +359,14 @@ type Kubelet struct {
// Syncs pods statuses with apiserver; also used as a cache of statuses.
statusManager *statusManager
// Knows how to run a container in a pod
containerRunner kubecontainer.ContainerRunner
//Cloud provider interface
cloud cloudprovider.Interface
// Reference to this node.
nodeRef *api.ObjectReference
// Manage containers.
containerManager *dockertools.DockerManager
}
// getRootDir returns the full path to the directory under which kubelet can
@ -665,7 +662,7 @@ func (kl *Kubelet) runContainer(pod *api.Pod, container *api.Container, podVolum
return "", err
}
id, err := kl.containerRunner.RunContainer(pod, container, opts)
id, err := kl.containerManager.RunContainer(pod, container, opts)
if err != nil {
return "", err
}
@ -1002,7 +999,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
func (kl *Kubelet) shouldContainerBeRestarted(container *api.Container, pod *api.Pod) bool {
podFullName := kubecontainer.GetPodFullName(pod)
// Check RestartPolicy for dead container
recentContainers, err := dockertools.GetRecentDockerContainersWithNameAndUUID(kl.dockerClient, podFullName, pod.UID, container.Name)
recentContainers, err := kl.containerManager.GetRecentDockerContainersWithNameAndUUID(podFullName, pod.UID, container.Name)
if err != nil {
glog.Errorf("Error listing recent containers for pod %q: %v", podFullName, err)
// TODO(dawnchen): error handling here?
@ -1497,7 +1494,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}
}
running, err := dockertools.GetRunningContainers(kl.dockerClient, killed)
running, err := kl.containerManager.GetRunningContainers(killed)
if err != nil {
glog.Errorf("Failed to poll container state: %v", err)
return err
@ -1697,7 +1694,7 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail stri
if err != nil {
return err
}
return dockertools.GetKubeletDockerContainerLogs(kl.dockerClient, dockerContainerID, tail, follow, stdout, stderr)
return kl.containerManager.GetKubeletDockerContainerLogs(dockerContainerID, tail, follow, stdout, stderr)
}
// GetHostname Returns the hostname as the kubelet sees it.
@ -1946,7 +1943,7 @@ func (kl *Kubelet) generatePodStatusByPod(pod *api.Pod) (api.PodStatus, error) {
glog.V(3).Infof("Generating status for %q", podFullName)
spec := &pod.Spec
podStatus, err := dockertools.GetDockerPodStatus(kl.dockerClient, *spec, podFullName, pod.UID)
podStatus, err := kl.containerManager.GetPodStatus(pod)
if err != nil {
// Error handling

View File

@ -113,7 +113,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
podManager, fakeMirrorClient := newFakePodManager()
kubelet.podManager = podManager
kubelet.containerRefManager = kubecontainer.NewRefManager()
kubelet.containerRunner = &dockertools.DockerContainerRunner{fakeDocker, fakeRecorder}
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder)
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
}

View File

@ -140,7 +140,7 @@ func TestRunOnce(t *testing.T) {
t: t,
}
kb.dockerPuller = &dockertools.FakeDockerPuller{}
kb.containerRunner = &dockertools.DockerContainerRunner{kb.dockerClient, kb.recorder}
kb.containerManager = dockertools.NewDockerManager(kb.dockerClient, kb.recorder)
results, err := kb.runOnce([]api.Pod{
{
ObjectMeta: api.ObjectMeta{