k3s/pkg/kubelet/kubelet.go

1213 lines
39 KiB
Go
Raw Normal View History

2014-06-06 23:40:48 +00:00
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
2014-09-24 21:27:10 +00:00
"io"
"io/ioutil"
"net"
2014-06-06 23:40:48 +00:00
"net/http"
"os"
"path"
2014-10-28 00:29:55 +00:00
"sort"
2014-06-06 23:40:48 +00:00
"strconv"
"strings"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
2014-09-01 05:10:49 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
2014-07-15 18:39:19 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
2014-06-06 23:40:48 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
2014-06-06 23:40:48 +00:00
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
2014-06-06 23:40:48 +00:00
)
const defaultChanSize = 1024
// taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc
const minShares = 2
const sharesPerCPU = 1024
const milliCPUToCPU = 1000
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
SyncPods([]api.BoundPod) error
}
type SourcesReadyFn func() bool
type volumeMap map[string]volume.Interface
// New creates a new Kubelet for use in main
func NewMainKubelet(
hn string,
dc dockertools.DockerInterface,
ec tools.EtcdClient,
rd string,
ni string,
ri time.Duration,
pullQPS float32,
2014-10-28 00:29:55 +00:00
pullBurst int,
minimumGCAge time.Duration,
maxContainerCount int,
sourcesReady SourcesReadyFn,
clusterDomain string,
clusterDNS net.IP) *Kubelet {
return &Kubelet{
hostname: hn,
dockerClient: dc,
etcdClient: ec,
rootDirectory: rd,
resyncInterval: ri,
networkContainerImage: ni,
podWorkers: newPodWorkers(),
dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{},
runner: dockertools.NewDockerContainerCommandRunner(dc),
httpClient: &http.Client{},
pullQPS: pullQPS,
pullBurst: pullBurst,
2014-10-28 00:29:55 +00:00
minimumGCAge: minimumGCAge,
maxContainerCount: maxContainerCount,
sourcesReady: sourcesReady,
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
}
}
2014-10-20 03:15:23 +00:00
type httpGetter interface {
Get(url string) (*http.Response, error)
}
2014-07-10 12:26:24 +00:00
// Kubelet is the main kubelet implementation.
2014-06-06 23:40:48 +00:00
type Kubelet struct {
hostname string
dockerClient dockertools.DockerInterface
rootDirectory string
networkContainerImage string
2014-10-10 00:16:21 +00:00
podWorkers *podWorkers
resyncInterval time.Duration
pods []api.BoundPod
sourcesReady SourcesReadyFn
// Needed to report events for containers belonging to deleted/modified pods.
// Tracks references for reporting events
dockerIDToRef map[dockertools.DockerID]*api.ObjectReference
refLock sync.RWMutex
// Tracks active pulls. Needed to protect image garbage collection
// See: https://github.com/docker/docker/issues/8926 for details
// TODO: Remove this when (if?) that issue is fixed.
pullLock sync.RWMutex
// Optional, no events will be sent without it
etcdClient tools.EtcdClient
// Optional, defaults to simple implementaiton
healthChecker health.HealthChecker
// Optional, defaults to simple Docker implementation
dockerPuller dockertools.DockerPuller
// Optional, defaults to /logs/ from /var/log
logServer http.Handler
2014-08-07 18:15:11 +00:00
// Optional, defaults to simple Docker implementation
runner dockertools.ContainerCommandRunner
// Optional, client for http requests, defaults to empty client
2014-10-20 03:15:23 +00:00
httpClient httpGetter
// Optional, maximum pull QPS from the docker registry, 0.0 means unlimited.
pullQPS float32
// Optional, maximum burst QPS from the docker registry, must be positive if QPS is > 0.0
pullBurst int
// Optional, no statistics will be available if omitted
2014-10-20 03:54:52 +00:00
cadvisorClient cadvisorInterface
cadvisorLock sync.RWMutex
2014-10-28 00:29:55 +00:00
// Optional, minimum age required for garbage collection. If zero, no limit.
minimumGCAge time.Duration
maxContainerCount int
// If non-empty, use this for container DNS search.
clusterDomain string
// If non-nil, use this for container DNS server.
clusterDNS net.IP
2014-10-28 00:29:55 +00:00
}
// GetRootDir returns the full path to the directory under which kubelet can
// store data. These functions are useful to pass interfaces to other modules
// that may need to know where to write data without getting a whole kubelet
// instance.
func (kl *Kubelet) GetRootDir() string {
return kl.rootDirectory
}
// GetPodsDir returns the full path to the directory under which pod
// directories are created.
// TODO(thockin): For now, this is the same as the root because that is assumed
// in other code. Will fix.
func (kl *Kubelet) GetPodsDir() string {
return kl.GetRootDir()
}
// GetPodDir returns the full path to the per-pod data directory for the
// specified pod. This directory may not exist if the pod does not exist.
func (kl *Kubelet) GetPodDir(podUID string) string {
return path.Join(kl.GetRootDir(), podUID)
}
// GetPodVolumesDir returns the full path to the per-pod data directory under
// which volumes are created for the specified pod. This directory may not
// exist if the pod does not exist.
func (kl *Kubelet) GetPodVolumesDir(podUID string) string {
return path.Join(kl.GetPodDir(podUID), "volumes")
}
// GetPodContainerDir returns the full path to the per-pod data directory under
// which container data is held for the specified pod. This directory may not
// exist if the pod or container does not exist.
func (kl *Kubelet) GetPodContainerDir(podUID, ctrName string) string {
return path.Join(kl.GetPodDir(podUID), ctrName)
}
2014-10-28 00:29:55 +00:00
type ByCreated []*docker.Container
func (a ByCreated) Len() int { return len(a) }
func (a ByCreated) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByCreated) Less(i, j int) bool { return a[i].Created.After(a[j].Created) }
// TODO: these removals are racy, we should make dockerclient threadsafe across List/Inspect transactions.
func (kl *Kubelet) purgeOldest(ids []string) error {
dockerData := []*docker.Container{}
for _, id := range ids {
data, err := kl.dockerClient.InspectContainer(id)
if err != nil {
return err
}
if !data.State.Running && (kl.minimumGCAge == 0 || time.Now().Sub(data.State.FinishedAt) > kl.minimumGCAge) {
dockerData = append(dockerData, data)
}
}
sort.Sort(ByCreated(dockerData))
if len(dockerData) <= kl.maxContainerCount {
return nil
}
dockerData = dockerData[kl.maxContainerCount:]
for _, data := range dockerData {
if err := kl.dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: data.ID}); err != nil {
return err
}
}
return nil
}
func (kl *Kubelet) GarbageCollectLoop() {
util.Forever(func() {
if err := kl.GarbageCollectContainers(); err != nil {
glog.Errorf("Garbage collect failed: %v", err)
}
if err := kl.GarbageCollectImages(); err != nil {
glog.Errorf("Garbage collect images failed: %v", err)
}
}, time.Minute*1)
}
func (kl *Kubelet) getUnusedImages() ([]string, error) {
kl.pullLock.Lock()
defer kl.pullLock.Unlock()
return dockertools.GetUnusedImages(kl.dockerClient)
}
func (kl *Kubelet) GarbageCollectImages() error {
images, err := kl.getUnusedImages()
if err != nil {
return err
}
for ix := range images {
if err := kl.dockerClient.RemoveImage(images[ix]); err != nil {
glog.Errorf("Failed to remove image: %s (%v)", images[ix], err)
}
}
return nil
}
2014-10-28 00:29:55 +00:00
// TODO: Also enforce a maximum total number of containers.
func (kl *Kubelet) GarbageCollectContainers() error {
if kl.maxContainerCount == 0 {
return nil
}
containers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, true)
if err != nil {
return err
}
uuidToIDMap := map[string][]string{}
for _, container := range containers {
_, uuid, name, _ := dockertools.ParseDockerName(container.ID)
uuidName := uuid + "." + name
uuidToIDMap[uuidName] = append(uuidToIDMap[uuidName], container.ID)
}
for _, list := range uuidToIDMap {
if len(list) <= kl.maxContainerCount {
continue
}
if err := kl.purgeOldest(list); err != nil {
return err
}
}
return nil
}
// SetCadvisorClient sets the cadvisor client in a thread-safe way.
2014-10-20 03:54:52 +00:00
func (kl *Kubelet) SetCadvisorClient(c cadvisorInterface) {
kl.cadvisorLock.Lock()
defer kl.cadvisorLock.Unlock()
kl.cadvisorClient = c
}
// GetCadvisorClient gets the cadvisor client.
2014-10-20 03:54:52 +00:00
func (kl *Kubelet) GetCadvisorClient() cadvisorInterface {
kl.cadvisorLock.RLock()
defer kl.cadvisorLock.RUnlock()
return kl.cadvisorClient
}
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.dockerPuller == nil {
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
}
if kl.healthChecker == nil {
kl.healthChecker = health.NewHealthChecker()
2014-06-06 23:40:48 +00:00
}
kl.syncLoop(updates, kl)
2014-06-06 23:40:48 +00:00
}
// Per-pod workers.
type podWorkers struct {
lock sync.Mutex
// Set of pods with existing workers.
workers util.StringSet
}
2014-10-10 00:16:21 +00:00
func newPodWorkers() *podWorkers {
return &podWorkers{
workers: util.NewStringSet(),
}
}
// Runs a worker for "podFullName" asynchronously with the specified "action".
// If the worker for the "podFullName" is already running, functions as a no-op.
func (self *podWorkers) Run(podFullName string, action func()) {
self.lock.Lock()
defer self.lock.Unlock()
// This worker is already running, let it finish.
if self.workers.Has(podFullName) {
return
}
self.workers.Insert(podFullName)
// Run worker async.
go func() {
defer util.HandleCrash()
action()
self.lock.Lock()
defer self.lock.Unlock()
self.workers.Delete(podFullName)
}()
}
func makeEnvironmentVariables(container *api.Container) []string {
var result []string
2014-06-06 23:40:48 +00:00
for _, value := range container.Env {
result = append(result, fmt.Sprintf("%s=%s", value.Name, value.Value))
2014-06-06 23:40:48 +00:00
}
return result
}
2014-06-06 23:40:48 +00:00
func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string {
2014-06-06 23:40:48 +00:00
binds := []string{}
2014-08-27 05:08:06 +00:00
for _, mount := range container.VolumeMounts {
vol, ok := podVolumes[mount.Name]
if !ok {
continue
2014-06-19 23:59:48 +00:00
}
2014-08-27 05:08:06 +00:00
b := fmt.Sprintf("%s:%s", vol.GetPath(), mount.MountPath)
if mount.ReadOnly {
b += ":ro"
2014-06-06 23:40:48 +00:00
}
2014-08-27 05:08:06 +00:00
binds = append(binds, b)
2014-06-06 23:40:48 +00:00
}
2014-08-27 05:08:06 +00:00
return binds
}
func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) {
2014-06-06 23:40:48 +00:00
exposedPorts := map[docker.Port]struct{}{}
portBindings := map[docker.Port][]docker.PortBinding{}
for _, port := range container.Ports {
exteriorPort := port.HostPort
if exteriorPort == 0 {
// No need to do port binding when HostPort is not specified
continue
}
interiorPort := port.ContainerPort
2014-06-06 23:40:48 +00:00
// Some of this port stuff is under-documented voodoo.
// See http://stackoverflow.com/questions/20428302/binding-a-port-to-a-host-interface-using-the-rest-api
2014-06-16 04:19:35 +00:00
var protocol string
2014-09-28 03:31:37 +00:00
switch strings.ToUpper(string(port.Protocol)) {
case "UDP":
2014-06-16 04:19:35 +00:00
protocol = "/udp"
case "TCP":
2014-06-16 04:19:35 +00:00
protocol = "/tcp"
default:
glog.Warningf("Unknown protocol '%s': defaulting to TCP", port.Protocol)
2014-06-16 04:19:35 +00:00
protocol = "/tcp"
}
dockerPort := docker.Port(strconv.Itoa(interiorPort) + protocol)
2014-06-06 23:40:48 +00:00
exposedPorts[dockerPort] = struct{}{}
portBindings[dockerPort] = []docker.PortBinding{
2014-06-12 21:09:40 +00:00
{
2014-06-06 23:40:48 +00:00
HostPort: strconv.Itoa(exteriorPort),
HostIP: port.HostIP,
2014-06-06 23:40:48 +00:00
},
}
}
return exposedPorts, portBindings
}
func milliCPUToShares(milliCPU int) int {
2014-07-29 18:34:16 +00:00
if milliCPU == 0 {
// zero milliCPU means unset. Use kernel default.
return 0
}
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
shares := (milliCPU * sharesPerCPU) / milliCPUToCPU
if shares < minShares {
return minShares
}
return shares
}
func (kl *Kubelet) mountExternalVolumes(pod *api.BoundPod) (volumeMap, error) {
podVolumes := make(volumeMap)
for _, vol := range pod.Spec.Volumes {
2014-10-22 17:02:02 +00:00
extVolume, err := volume.CreateVolumeBuilder(&vol, pod.Name, kl.rootDirectory)
if err != nil {
return nil, err
}
// TODO(jonesdl) When the default volume behavior is no longer supported, this case
// should never occur and an error should be thrown instead.
if extVolume == nil {
continue
}
podVolumes[vol.Name] = extVolume
err = extVolume.SetUp()
if err != nil {
return nil, err
}
}
return podVolumes, nil
}
// A basic interface that knows how to execute handlers
type actionHandler interface {
2014-09-05 09:49:11 +00:00
Run(podFullName, uuid string, container *api.Container, handler *api.Handler) error
}
func (kl *Kubelet) newActionHandler(handler *api.Handler) actionHandler {
switch {
case handler.Exec != nil:
return &execActionHandler{kubelet: kl}
case handler.HTTPGet != nil:
return &httpActionHandler{client: kl.httpClient, kubelet: kl}
default:
2014-10-07 20:53:25 +00:00
glog.Errorf("Invalid handler: %v", handler)
return nil
}
}
2014-09-05 09:49:11 +00:00
func (kl *Kubelet) runHandler(podFullName, uuid string, container *api.Container, handler *api.Handler) error {
actionHandler := kl.newActionHandler(handler)
if actionHandler == nil {
return fmt.Errorf("invalid handler")
}
2014-09-05 09:49:11 +00:00
return actionHandler.Run(podFullName, uuid, container, handler)
}
// fieldPath returns a fieldPath locating container within pod.
// Returns an error if the container isn't part of the pod.
func fieldPath(pod *api.BoundPod, container *api.Container) (string, error) {
for i := range pod.Spec.Containers {
here := &pod.Spec.Containers[i]
if here.Name == container.Name {
if here.Name == "" {
return fmt.Sprintf("spec.containers[%d]", i), nil
} else {
return fmt.Sprintf("spec.containers{%s}", here.Name), nil
}
}
}
return "", fmt.Errorf("container %#v not found in pod %#v", container, pod)
}
// containerRef returns an *api.ObjectReference which references the given container within the
// given pod. Returns an error if the reference can't be constructed or the container doesn't
// actually belong to the pod.
// TODO: Pods that came to us by static config or over HTTP have no selfLink set, which makes
// this fail and log an error. Figure out how we want to identify these pods to the rest of the
// system.
func containerRef(pod *api.BoundPod, container *api.Container) (*api.ObjectReference, error) {
fieldPath, err := fieldPath(pod, container)
if err != nil {
// TODO: figure out intelligent way to refer to containers that we implicitly
// start (like the network container). This is not a good way, ugh.
fieldPath = "implicitly required container " + container.Name
}
ref, err := api.GetPartialReference(pod, fieldPath)
if err != nil {
return nil, err
}
return ref, nil
}
// setRef stores a reference to a pod's container, associating it with the given docker id.
func (kl *Kubelet) setRef(id dockertools.DockerID, ref *api.ObjectReference) {
kl.refLock.Lock()
defer kl.refLock.Unlock()
if kl.dockerIDToRef == nil {
kl.dockerIDToRef = map[dockertools.DockerID]*api.ObjectReference{}
}
kl.dockerIDToRef[id] = ref
}
// clearRef forgets the given docker id and its associated container reference.
func (kl *Kubelet) clearRef(id dockertools.DockerID) {
kl.refLock.Lock()
defer kl.refLock.Unlock()
delete(kl.dockerIDToRef, id)
}
// getRef returns the container reference of the given id, or (nil, false) if none is stored.
func (kl *Kubelet) getRef(id dockertools.DockerID) (ref *api.ObjectReference, ok bool) {
kl.refLock.RLock()
defer kl.refLock.RUnlock()
ref, ok = kl.dockerIDToRef[id]
return ref, ok
}
// Run a single container from a pod. Returns the docker container ID
func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, podVolumes volumeMap, netMode string) (id dockertools.DockerID, err error) {
2014-11-14 19:34:41 +00:00
ref, err := containerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
envVariables := makeEnvironmentVariables(container)
2014-08-27 05:08:06 +00:00
binds := makeBinds(pod, container, podVolumes)
exposedPorts, portBindings := makePortsAndBindings(container)
2014-06-06 23:40:48 +00:00
opts := docker.CreateContainerOptions{
Name: dockertools.BuildDockerName(pod.UID, GetPodFullName(pod), container),
2014-06-06 23:40:48 +00:00
Config: &docker.Config{
Cmd: container.Command,
Env: envVariables,
ExposedPorts: exposedPorts,
2014-10-22 17:02:02 +00:00
Hostname: pod.Name,
2014-06-06 23:40:48 +00:00
Image: container.Image,
Memory: int64(container.Memory),
CPUShares: int64(milliCPUToShares(container.CPU)),
2014-06-06 23:40:48 +00:00
WorkingDir: container.WorkingDir,
},
}
dockerContainer, err := kl.dockerClient.CreateContainer(opts)
2014-06-06 23:40:48 +00:00
if err != nil {
2014-11-14 19:34:41 +00:00
if ref != nil {
record.Eventf(ref, "failed", "failed",
2014-11-19 21:57:54 +00:00
"Failed to create docker container with error: %v", err)
2014-11-14 19:34:41 +00:00
}
2014-06-06 23:40:48 +00:00
return "", err
}
2014-11-14 19:34:41 +00:00
// Remember this reference so we can report events about this container
if ref != nil {
kl.setRef(dockertools.DockerID(dockerContainer.ID), ref)
record.Eventf(ref, "waiting", "created", "Created with docker id %v", dockerContainer.ID)
}
if len(container.TerminationMessagePath) != 0 {
p := kl.GetPodContainerDir(pod.UID, container.Name)
if err := os.MkdirAll(p, 0750); err != nil {
glog.Errorf("Error on creating %s: %v", p, err)
} else {
containerLogPath := path.Join(p, dockerContainer.ID)
fs, err := os.Create(containerLogPath)
if err != nil {
glog.Errorf("Error on creating termination-log file %s: %v", containerLogPath, err)
}
defer fs.Close()
b := fmt.Sprintf("%s:%s", containerLogPath, container.TerminationMessagePath)
binds = append(binds, b)
}
}
privileged := false
2014-09-16 22:18:33 +00:00
if capabilities.Get().AllowPrivileged {
privileged = container.Privileged
} else if container.Privileged {
return "", fmt.Errorf("container requested privileged mode, but it is disallowed globally.")
}
hc := &docker.HostConfig{
2014-06-06 23:40:48 +00:00
PortBindings: portBindings,
Binds: binds,
NetworkMode: netMode,
Privileged: privileged,
}
if pod.Spec.DNSPolicy == api.DNSClusterFirst {
if err := kl.applyClusterDNS(hc, pod); err != nil {
return "", err
}
}
err = kl.dockerClient.StartContainer(dockerContainer.ID, hc)
if err != nil {
2014-11-14 19:34:41 +00:00
if ref != nil {
record.Eventf(ref, "failed", "failed",
2014-11-19 21:57:54 +00:00
"Failed to start with docker id %v with error: %v", dockerContainer.ID, err)
2014-11-14 19:34:41 +00:00
}
return "", err
}
2014-11-14 19:34:41 +00:00
if ref != nil {
2014-11-11 07:53:48 +00:00
record.Eventf(ref, "running", "started", "Started with docker id %v", dockerContainer.ID)
}
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
handlerErr := kl.runHandler(GetPodFullName(pod), pod.UID, container, container.Lifecycle.PostStart)
if handlerErr != nil {
kl.killContainerByID(dockerContainer.ID, "")
return dockertools.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
}
}
return dockertools.DockerID(dockerContainer.ID), err
2014-06-06 23:40:48 +00:00
}
func (kl *Kubelet) applyClusterDNS(hc *docker.HostConfig, pod *api.BoundPod) error {
// Get host DNS settings and append them to cluster DNS settings.
f, err := os.Open("/etc/resolv.conf")
if err != nil {
return err
}
defer f.Close()
hostDNS, hostSearch, err := parseResolvConf(f)
if err != nil {
return err
}
if kl.clusterDNS != nil {
hc.DNS = append([]string{kl.clusterDNS.String()}, hostDNS...)
}
if kl.clusterDomain != "" {
nsDomain := fmt.Sprintf("%s.%s", pod.Namespace, kl.clusterDomain)
hc.DNSSearch = append([]string{nsDomain, kl.clusterDomain}, hostSearch...)
}
return nil
}
// Returns the list of DNS servers and DNS search domains.
func parseResolvConf(reader io.Reader) (nameservers []string, searches []string, err error) {
file, err := ioutil.ReadAll(reader)
if err != nil {
return nil, nil, err
}
// Lines of the form "nameserver 1.2.3.4" accumulate.
nameservers = []string{}
// Lines of the form "search example.com" overrule - last one wins.
searches = []string{}
lines := strings.Split(string(file), "\n")
for l := range lines {
trimmed := strings.TrimSpace(lines[l])
if strings.HasPrefix(trimmed, "#") {
continue
}
fields := strings.Fields(trimmed)
if len(fields) == 0 {
continue
}
if fields[0] == "nameserver" {
nameservers = append(nameservers, fields[1:]...)
}
if fields[0] == "search" {
searches = fields[1:]
}
}
return nameservers, searches, nil
}
// Kill a docker container
func (kl *Kubelet) killContainer(dockerContainer *docker.APIContainers) error {
return kl.killContainerByID(dockerContainer.ID, dockerContainer.Names[0])
}
func (kl *Kubelet) killContainerByID(ID, name string) error {
glog.V(2).Infof("Killing: %s", ID)
err := kl.dockerClient.StopContainer(ID, 10)
if len(name) == 0 {
return err
}
ref, ok := kl.getRef(dockertools.DockerID(ID))
if !ok {
glog.Warningf("No ref for pod '%v' - '%v'", ID, name)
} else {
// TODO: pass reason down here, and state, or move this call up the stack.
2014-11-11 07:53:48 +00:00
record.Eventf(ref, "terminated", "killing", "Killing %v - %v", ID, name)
}
2014-06-06 23:40:48 +00:00
return err
}
const (
networkContainerName = "net"
NetworkContainerImage = "kubernetes/pause:latest"
)
// createNetworkContainer starts the network container for a pod. Returns the docker container ID of the newly created container.
func (kl *Kubelet) createNetworkContainer(pod *api.BoundPod) (dockertools.DockerID, error) {
var ports []api.Port
// Docker only exports ports from the network container. Let's
// collect all of the relevant ports and export them.
for _, container := range pod.Spec.Containers {
ports = append(ports, container.Ports...)
}
container := &api.Container{
Name: networkContainerName,
Image: kl.networkContainerImage,
Ports: ports,
}
2014-11-19 21:57:54 +00:00
ref, err := containerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
// TODO: make this a TTL based pull (if image older than X policy, pull)
ok, err := kl.dockerPuller.IsImagePresent(container.Image)
if err != nil {
2014-11-19 21:57:54 +00:00
if ref != nil {
record.Eventf(ref, "failed", "failed", "Failed to inspect image %s", container.Image)
}
return "", err
}
if !ok {
if err := kl.pullImage(container.Image, ref); err != nil {
return "", err
}
}
2014-11-19 21:57:54 +00:00
if ref != nil {
record.Eventf(ref, "waiting", "pulled", "Successfully pulled image %s", container.Image)
}
return kl.runContainer(pod, container, nil, "")
}
func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error {
kl.pullLock.RLock()
defer kl.pullLock.RUnlock()
if err := kl.dockerPuller.Pull(img); err != nil {
if ref != nil {
record.Eventf(ref, "failed", "failed", "Failed to pull image %s", img)
}
return err
}
return nil
}
2014-10-21 05:03:23 +00:00
// Kill all containers in a pod. Returns the number of containers deleted and an error if one occurs.
func (kl *Kubelet) killContainersInPod(pod *api.BoundPod, dockerContainers dockertools.DockerContainers) (int, error) {
podFullName := GetPodFullName(pod)
count := 0
errs := make(chan error, len(pod.Spec.Containers))
wg := sync.WaitGroup{}
for _, container := range pod.Spec.Containers {
2014-10-22 04:37:15 +00:00
// TODO: Consider being more aggressive: kill all containers with this pod UID, period.
if dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, pod.UID, container.Name); found {
count++
wg.Add(1)
go func() {
err := kl.killContainer(dockerContainer)
if err != nil {
glog.Errorf("Failed to delete container: %v; Skipping pod %s", err, podFullName)
errs <- err
}
wg.Done()
}()
}
}
wg.Wait()
close(errs)
if len(errs) > 0 {
errList := []error{}
for err := range errs {
errList = append(errList, err)
}
return -1, fmt.Errorf("failed to delete containers (%v)", errList)
}
return count, nil
}
type empty struct{}
func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.DockerContainers) error {
podFullName := GetPodFullName(pod)
uuid := pod.UID
containersToKeep := make(map[dockertools.DockerID]empty)
killedContainers := make(map[dockertools.DockerID]empty)
// Make sure we have a network container
var netID dockertools.DockerID
2014-10-21 05:03:23 +00:00
if netDockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, networkContainerName); found {
netID = dockertools.DockerID(netDockerContainer.ID)
} else {
2014-10-22 04:37:15 +00:00
glog.V(3).Infof("Network container doesn't exist for pod %q, re-creating the pod", podFullName)
2014-10-21 05:03:23 +00:00
count, err := kl.killContainersInPod(pod, dockerContainers)
if err != nil {
return err
}
2014-10-21 05:03:23 +00:00
netID, err = kl.createNetworkContainer(pod)
if err != nil {
glog.Errorf("Failed to introspect network container: %v; Skipping pod %s", err, podFullName)
2014-07-01 05:27:56 +00:00
return err
}
if count > 0 {
2014-10-21 05:03:23 +00:00
// Re-list everything, otherwise we'll think we're ok.
2014-09-29 21:38:31 +00:00
dockerContainers, err = dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
glog.Errorf("Error listing containers %#v", dockerContainers)
return err
}
}
2014-07-01 05:27:56 +00:00
}
containersToKeep[netID] = empty{}
podVolumes, err := kl.mountExternalVolumes(pod)
if err != nil {
glog.Errorf("Unable to mount volumes for pod %s: %v; skipping pod", podFullName, err)
return err
}
podStatus := api.PodStatus{}
2014-09-05 09:49:11 +00:00
info, err := kl.GetPodInfo(podFullName, uuid)
2014-08-01 00:35:54 +00:00
if err != nil {
2014-10-21 05:03:23 +00:00
glog.Errorf("Unable to get pod with name %s and uuid %s info, health checks may be invalid", podFullName, uuid)
2014-08-01 00:35:54 +00:00
}
netInfo, found := info[networkContainerName]
if found {
podStatus.PodIP = netInfo.PodIP
2014-08-01 00:35:54 +00:00
}
for _, container := range pod.Spec.Containers {
expectedHash := dockertools.HashContainer(&container)
2014-09-05 09:49:11 +00:00
if dockerContainer, found, hash := dockerContainers.FindPodContainer(podFullName, uuid, container.Name); found {
containerID := dockertools.DockerID(dockerContainer.ID)
glog.V(3).Infof("pod %s container %s exists as %v", podFullName, container.Name, containerID)
// look for changes in the container.
if hash == 0 || hash == expectedHash {
// TODO: This should probably be separated out into a separate goroutine.
healthy, err := kl.healthy(podFullName, uuid, podStatus, container, dockerContainer)
if err != nil {
glog.V(1).Infof("health check errored: %v", err)
containersToKeep[containerID] = empty{}
continue
}
if healthy == health.Healthy {
containersToKeep[containerID] = empty{}
continue
}
glog.V(1).Infof("pod %s container %s is unhealthy.", podFullName, container.Name, healthy)
} else {
glog.V(3).Infof("container hash changed %d vs %d.", hash, expectedHash)
}
if err := kl.killContainer(dockerContainer); err != nil {
glog.V(1).Infof("Failed to kill container %s: %v", dockerContainer.ID, err)
continue
2014-07-03 05:35:50 +00:00
}
killedContainers[containerID] = empty{}
// Also kill associated network container
if netContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, networkContainerName); found {
if err := kl.killContainer(netContainer); err != nil {
glog.V(1).Infof("Failed to kill network container %s: %v", netContainer.ID, err)
continue
}
}
2014-07-01 05:27:56 +00:00
}
// Check RestartPolicy for container
recentContainers, err := dockertools.GetRecentDockerContainersWithNameAndUUID(kl.dockerClient, podFullName, uuid, container.Name)
if err != nil {
glog.Errorf("Error listing recent containers with name and uuid:%s--%s--%s", podFullName, uuid, container.Name)
// TODO(dawnchen): error handling here?
}
if len(recentContainers) > 0 && pod.Spec.RestartPolicy.Always == nil {
if pod.Spec.RestartPolicy.Never != nil {
glog.V(3).Infof("Already ran container with name %s--%s--%s, do nothing",
podFullName, uuid, container.Name)
continue
}
if pod.Spec.RestartPolicy.OnFailure != nil {
// Check the exit code of last run
if recentContainers[0].State.ExitCode == 0 {
glog.V(3).Infof("Already successfully ran container with name %s--%s--%s, do nothing",
podFullName, uuid, container.Name)
continue
}
}
}
glog.V(3).Infof("Container with name %s--%s--%s doesn't exist, creating %#v", podFullName, uuid, container.Name, container)
2014-11-19 21:57:54 +00:00
ref, err := containerRef(pod, &container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
if !api.IsPullNever(container.ImagePullPolicy) {
present, err := kl.dockerPuller.IsImagePresent(container.Image)
latest := dockertools.RequireLatestImage(container.Image)
if err != nil {
2014-11-19 21:57:54 +00:00
if ref != nil {
record.Eventf(ref, "failed", "failed", "Failed to inspect image %s", container.Image)
}
glog.Errorf("Failed to inspect image %s: %v; skipping pod %s container %s", container.Image, err, podFullName, container.Name)
continue
}
if api.IsPullAlways(container.ImagePullPolicy) ||
(api.IsPullIfNotPresent(container.ImagePullPolicy) && (!present || latest)) {
if err := kl.dockerPuller.Pull(container.Image); err != nil {
2014-11-19 21:57:54 +00:00
if ref != nil {
record.Eventf(ref, "failed", "failed", "Failed to pull image %s", container.Image)
}
glog.Errorf("Failed to pull image %s: %v; skipping pod %s container %s.", container.Image, err, podFullName, container.Name)
continue
}
2014-11-19 21:57:54 +00:00
if ref != nil {
record.Eventf(ref, "waiting", "pulled", "Successfully pulled image %s", container.Image)
}
}
}
// TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container
containerID, err := kl.runContainer(pod, &container, podVolumes, "container:"+string(netID))
if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures?
glog.Errorf("Error running pod %s container %s: %v", podFullName, container.Name, err)
continue
}
containersToKeep[containerID] = empty{}
}
// Kill any containers in this pod which were not identified above (guards against duplicates).
for id, container := range dockerContainers {
curPodFullName, curUUID, _, _ := dockertools.ParseDockerName(container.Names[0])
2014-09-05 09:49:11 +00:00
if curPodFullName == podFullName && curUUID == uuid {
// Don't kill containers we want to keep or those we already killed.
_, keep := containersToKeep[id]
_, killed := killedContainers[id]
if !keep && !killed {
2014-10-22 04:37:15 +00:00
glog.V(1).Infof("Killing unwanted container in pod %q: %+v", curUUID, container)
err = kl.killContainer(container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
}
}
}
2014-06-06 23:40:48 +00:00
}
2014-07-01 05:27:56 +00:00
return nil
}
type podContainer struct {
podFullName string
2014-09-05 09:49:11 +00:00
uuid string
containerName string
}
2014-07-03 01:06:54 +00:00
// Stores all volumes defined by the set of pods into a map.
// Keys for each entry are in the format (POD_ID)/(VOLUME_NAME)
func getDesiredVolumes(pods []api.BoundPod) map[string]api.Volume {
desiredVolumes := make(map[string]api.Volume)
for _, pod := range pods {
for _, volume := range pod.Spec.Volumes {
2014-10-22 17:02:02 +00:00
identifier := path.Join(pod.Name, volume.Name)
desiredVolumes[identifier] = volume
}
}
return desiredVolumes
}
// Compares the map of current volumes to the map of desired volumes.
// If an active volume does not have a respective desired volume, clean it up.
func (kl *Kubelet) reconcileVolumes(pods []api.BoundPod) error {
desiredVolumes := getDesiredVolumes(pods)
currentVolumes := volume.GetCurrentVolumes(kl.rootDirectory)
for name, vol := range currentVolumes {
if _, ok := desiredVolumes[name]; !ok {
//TODO (jonesdl) We should somehow differentiate between volumes that are supposed
//to be deleted and volumes that are leftover after a crash.
glog.Warningf("Orphaned volume %s found, tearing down volume", name)
//TODO (jonesdl) This should not block other kubelet synchronization procedures
err := vol.TearDown()
if err != nil {
glog.Errorf("Could not tear down volume %s: %v", name, err)
}
}
}
return nil
}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
2014-10-20 04:26:15 +00:00
glog.V(4).Infof("Desired: %#v", pods)
var err error
desiredContainers := make(map[podContainer]empty)
2014-10-22 04:37:15 +00:00
desiredPods := make(map[string]empty)
2014-07-01 05:27:56 +00:00
2014-09-29 21:38:31 +00:00
dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
2014-10-20 04:26:15 +00:00
glog.Errorf("Error listing containers: %#v", dockerContainers)
return err
}
2014-07-01 05:27:56 +00:00
// Check for any containers that need starting
for ix := range pods {
pod := &pods[ix]
podFullName := GetPodFullName(pod)
uuid := pod.UID
2014-10-22 04:37:15 +00:00
desiredPods[uuid] = empty{}
// Add all containers (including net) to the map.
2014-09-05 09:49:11 +00:00
desiredContainers[podContainer{podFullName, uuid, networkContainerName}] = empty{}
for _, cont := range pod.Spec.Containers {
2014-09-05 09:49:11 +00:00
desiredContainers[podContainer{podFullName, uuid, cont.Name}] = empty{}
}
// Run the sync in an async manifest worker.
kl.podWorkers.Run(podFullName, func() {
err := kl.syncPod(pod, dockerContainers)
2014-07-01 05:27:56 +00:00
if err != nil {
glog.Errorf("Error syncing pod, skipping: %v", err)
record.Eventf(pod, "", "failedSync", "Error syncing pod, skipping: %v", err)
2014-07-01 05:27:56 +00:00
}
})
2014-07-01 16:37:45 +00:00
}
if !kl.sourcesReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
glog.V(4).Infof("Skipping deletes, sources aren't ready yet.")
return nil
}
2014-10-20 04:26:15 +00:00
// Kill any containers we don't need.
2014-10-20 04:35:08 +00:00
for _, container := range dockerContainers {
// Don't kill containers that are in the desired pods.
podFullName, uuid, containerName, _ := dockertools.ParseDockerName(container.Names[0])
2014-10-22 04:37:15 +00:00
if _, found := desiredPods[uuid]; found {
// syncPod() will handle this one.
continue
}
2014-10-20 04:26:15 +00:00
pc := podContainer{podFullName, uuid, containerName}
if _, ok := desiredContainers[pc]; !ok {
glog.V(1).Infof("Killing unwanted container %+v", pc)
err = kl.killContainer(container)
2014-06-06 23:40:48 +00:00
if err != nil {
glog.Errorf("Error killing container %+v: %v", pc, err)
2014-06-06 23:40:48 +00:00
}
}
}
// Remove any orphaned volumes.
kl.reconcileVolumes(pods)
2014-06-06 23:40:48 +00:00
return err
}
func updateBoundPods(changed []api.BoundPod, current []api.BoundPod) []api.BoundPod {
updated := []api.BoundPod{}
m := map[string]*api.BoundPod{}
for i := range changed {
pod := &changed[i]
m[pod.UID] = pod
}
for i := range current {
pod := &current[i]
if m[pod.UID] != nil {
updated = append(updated, *m[pod.UID])
glog.V(4).Infof("pod with UID: %s has a new spec %+v", pod.UID, *m[pod.UID])
} else {
updated = append(updated, *pod)
glog.V(4).Infof("pod with UID: %s stay with the same spec %+v", pod.UID, *pod)
}
}
return updated
}
// filterHostPortConflicts removes pods that conflict on Port.HostPort values
func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
filtered := []api.BoundPod{}
ports := map[int]bool{}
extract := func(p *api.Port) int { return p.HostPort }
for i := range pods {
pod := &pods[i]
if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 {
glog.Warningf("Pod %s: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs)
continue
}
filtered = append(filtered, *pod)
}
return filtered
}
// syncLoop is the main loop for processing changes. It watches for changes from
2014-06-20 16:31:18 +00:00
// four channels (file, etcd, server, and http) and creates a union of them. For
2014-06-06 23:40:48 +00:00
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync_frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
2014-06-06 23:40:48 +00:00
for {
select {
case u := <-updates:
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")
kl.pods = u.Pods
kl.pods = filterHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = filterHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}
case <-time.After(kl.resyncInterval):
2014-10-20 04:26:15 +00:00
glog.V(4).Infof("Periodic sync")
if kl.pods == nil {
continue
}
}
2014-06-20 16:31:18 +00:00
err := handler.SyncPods(kl.pods)
2014-06-06 23:40:48 +00:00
if err != nil {
glog.Errorf("Couldn't sync containers: %v", err)
2014-06-06 23:40:48 +00:00
}
}
}
// GetKubeletContainerLogs returns logs from the container
2014-09-24 23:51:54 +00:00
// The second parameter of GetPodInfo and FindPodContainer methods represents pod UUID, which is allowed to be blank
2014-09-22 20:14:23 +00:00
func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error {
2014-09-24 23:51:54 +00:00
_, err := kl.GetPodInfo(podFullName, "")
if err == dockertools.ErrNoContainersInPod {
return fmt.Errorf("pod not found (%s)\n", podFullName)
2014-09-24 23:51:54 +00:00
}
2014-09-29 21:38:31 +00:00
dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, true)
if err != nil {
return err
}
2014-09-24 23:51:54 +00:00
dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, "", containerName)
if !found {
return fmt.Errorf("container not found (%s)\n", containerName)
}
2014-09-24 21:27:10 +00:00
return dockertools.GetKubeletDockerContainerLogs(kl.dockerClient, dockerContainer.ID, tail, follow, stdout, stderr)
}
2014-10-22 23:52:38 +00:00
// GetBoundPods returns all pods bound to the kubelet and their spec
func (kl *Kubelet) GetBoundPods() ([]api.BoundPod, error) {
return kl.pods, nil
}
// GetPodInfo returns information from Docker about the containers in a pod
2014-09-05 09:49:11 +00:00
func (kl *Kubelet) GetPodInfo(podFullName, uuid string) (api.PodInfo, error) {
var manifest api.PodSpec
for _, pod := range kl.pods {
2014-10-06 18:54:51 +00:00
if GetPodFullName(&pod) == podFullName {
manifest = pod.Spec
break
}
}
return dockertools.GetDockerPodInfo(kl.dockerClient, manifest, podFullName, uuid)
}
func (kl *Kubelet) healthy(podFullName, podUUID string, status api.PodStatus, container api.Container, dockerContainer *docker.APIContainers) (health.Status, error) {
2014-07-03 05:35:50 +00:00
// Give the container 60 seconds to start up.
2014-07-09 23:53:31 +00:00
if container.LivenessProbe == nil {
2014-07-15 18:39:19 +00:00
return health.Healthy, nil
2014-07-03 05:35:50 +00:00
}
if time.Now().Unix()-dockerContainer.Created < container.LivenessProbe.InitialDelaySeconds {
2014-07-15 18:39:19 +00:00
return health.Healthy, nil
2014-07-03 05:35:50 +00:00
}
if kl.healthChecker == nil {
2014-07-15 18:39:19 +00:00
return health.Healthy, nil
2014-07-03 05:35:50 +00:00
}
return kl.healthChecker.HealthCheck(podFullName, podUUID, status, container)
2014-07-03 05:35:50 +00:00
}
// Returns logs of current machine.
func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
// TODO: whitelist logs we are willing to serve
kl.logServer.ServeHTTP(w, req)
}
2014-08-07 18:15:11 +00:00
// Run a command in a container, returns the combined stdout, stderr as an array of bytes
2014-09-05 09:49:11 +00:00
func (kl *Kubelet) RunInContainer(podFullName, uuid, container string, cmd []string) ([]byte, error) {
2014-08-07 18:15:11 +00:00
if kl.runner == nil {
return nil, fmt.Errorf("no runner specified.")
}
2014-09-29 21:38:31 +00:00
dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
2014-08-07 18:15:11 +00:00
if err != nil {
return nil, err
}
2014-09-05 09:49:11 +00:00
dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, container)
2014-08-07 18:15:11 +00:00
if !found {
return nil, fmt.Errorf("container not found (%s)", container)
}
return kl.runner.RunInContainer(dockerContainer.ID, cmd)
}
// BirthCry sends an event that the kubelet has started up.
func (kl *Kubelet) BirthCry() {
// Make an event that kubelet restarted.
// TODO: get the real minion object of ourself,
// and use the real minion name and UID.
ref := &api.ObjectReference{
Kind: "Minion",
Name: kl.hostname,
UID: kl.hostname,
Namespace: api.NamespaceDefault,
}
record.Eventf(ref, "", "starting", "Starting kubelet.")
}