k3s/pkg/kubelet/kubelet.go

897 lines
29 KiB
Go
Raw Normal View History

2014-06-06 23:40:48 +00:00
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"encoding/json"
2014-07-01 21:05:10 +00:00
"errors"
2014-06-06 23:40:48 +00:00
"fmt"
"io/ioutil"
2014-06-26 22:00:55 +00:00
"net"
2014-06-06 23:40:48 +00:00
"net/http"
"os"
"path"
2014-06-19 20:06:52 +00:00
"path/filepath"
2014-06-19 22:14:57 +00:00
"sort"
2014-06-06 23:40:48 +00:00
"strconv"
"strings"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
2014-07-15 18:39:19 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
2014-06-06 23:40:48 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
"github.com/google/cadvisor/client"
2014-06-19 00:31:18 +00:00
"github.com/google/cadvisor/info"
2014-06-06 23:40:48 +00:00
"gopkg.in/v1/yaml"
)
const defaultChanSize = 1024
// taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc
const minShares = 2
const sharesPerCpu = 1024
const milliCpuToCpu = 1000
2014-07-10 12:26:24 +00:00
// CadvisorInterface is an abstract interface for testability. It abstracts the interface of "github.com/google/cadvisor/client".Client.
2014-06-19 00:31:18 +00:00
type CadvisorInterface interface {
ContainerInfo(name string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
2014-06-19 00:31:18 +00:00
MachineInfo() (*info.MachineInfo, error)
}
2014-07-10 12:26:24 +00:00
// New creates a new Kubelet.
func New() *Kubelet {
return &Kubelet{}
}
2014-07-10 12:26:24 +00:00
// Kubelet is the main kubelet implementation.
2014-06-06 23:40:48 +00:00
type Kubelet struct {
Hostname string
EtcdClient tools.EtcdClient
2014-06-06 23:40:48 +00:00
DockerClient DockerInterface
DockerPuller DockerPuller
2014-06-19 00:31:18 +00:00
CadvisorClient CadvisorInterface
2014-06-06 23:40:48 +00:00
FileCheckFrequency time.Duration
SyncFrequency time.Duration
HTTPCheckFrequency time.Duration
pullLock sync.Mutex
2014-07-15 18:39:19 +00:00
HealthChecker health.HealthChecker
2014-06-06 23:40:48 +00:00
}
type manifestUpdate struct {
source string
manifests []api.ContainerManifest
}
const (
fileSource = "file"
etcdSource = "etcd"
httpClientSource = "http_client"
httpServerSource = "http_server"
)
2014-07-10 12:26:24 +00:00
// RunKubelet starts background goroutines. If config_path, manifest_url, or address are empty,
2014-06-06 23:40:48 +00:00
// they are not watched. Never returns.
2014-07-10 12:26:24 +00:00
func (kl *Kubelet) RunKubelet(dockerEndpoint, configPath, manifestURL, etcdServers, address string, port uint) {
if kl.CadvisorClient == nil {
var err error
kl.CadvisorClient, err = cadvisor.NewClient("http://127.0.0.1:5000")
if err != nil {
glog.Errorf("Error on creating cadvisor client: %v", err)
}
}
if kl.DockerPuller == nil {
kl.DockerPuller = NewDockerPuller(kl.DockerClient)
}
updateChannel := make(chan manifestUpdate)
2014-07-10 12:26:24 +00:00
if configPath != "" {
glog.Infof("Watching for file configs at %s", configPath)
go util.Forever(func() {
2014-07-10 12:26:24 +00:00
kl.WatchFiles(configPath, updateChannel)
}, kl.FileCheckFrequency)
2014-06-20 16:31:18 +00:00
}
2014-07-10 12:26:24 +00:00
if manifestURL != "" {
glog.Infof("Watching for HTTP configs at %s", manifestURL)
go util.Forever(func() {
2014-07-10 12:26:24 +00:00
if err := kl.extractFromHTTP(manifestURL, updateChannel); err != nil {
glog.Errorf("Error syncing http: %v", err)
}
}, kl.HTTPCheckFrequency)
2014-06-06 23:40:48 +00:00
}
2014-07-10 12:26:24 +00:00
if etcdServers != "" {
servers := []string{etcdServers}
glog.Infof("Watching for etcd configs at %v", servers)
kl.EtcdClient = etcd.NewClient(servers)
go util.Forever(func() { kl.SyncAndSetupEtcdWatch(updateChannel) }, 20*time.Second)
2014-06-06 23:40:48 +00:00
}
if address != "" {
glog.Infof("Starting to listen on %s:%d", address, port)
2014-07-15 13:54:23 +00:00
handler := Server{
2014-07-15 00:00:23 +00:00
Kubelet: kl,
UpdateChannel: updateChannel,
DelegateHandler: http.DefaultServeMux,
2014-06-06 23:40:48 +00:00
}
s := &http.Server{
2014-06-26 22:00:55 +00:00
Addr: net.JoinHostPort(address, strconv.FormatUint(uint64(port), 10)),
2014-06-06 23:40:48 +00:00
Handler: &handler,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
go util.Forever(func() { s.ListenAndServe() }, 0)
}
2014-07-15 18:39:19 +00:00
kl.HealthChecker = health.MakeHealthChecker()
kl.syncLoop(updateChannel, kl)
2014-06-06 23:40:48 +00:00
}
2014-07-10 12:26:24 +00:00
// SyncHandler is an interface implemented by Kubelet, for testability
2014-06-06 23:40:48 +00:00
type SyncHandler interface {
SyncManifests([]api.ContainerManifest) error
}
2014-07-10 12:26:24 +00:00
// LogEvent logs an event to the etcd backend.
func (kl *Kubelet) LogEvent(event *api.Event) error {
if kl.EtcdClient == nil {
2014-07-10 12:26:24 +00:00
return fmt.Errorf("no etcd client connection")
2014-06-06 23:40:48 +00:00
}
event.Timestamp = time.Now().Unix()
data, err := json.Marshal(event)
if err != nil {
return err
}
var response *etcd.Response
response, err = kl.EtcdClient.AddChild(fmt.Sprintf("/events/%s", event.Container.Name), string(data), 60*60*48 /* 2 days */)
2014-06-06 23:40:48 +00:00
// TODO(bburns) : examine response here.
if err != nil {
glog.Errorf("Error writing event: %s\n", err)
2014-06-06 23:40:48 +00:00
if response != nil {
glog.Infof("Response was: %v\n", *response)
2014-06-06 23:40:48 +00:00
}
}
return err
}
// Return a map of docker containers that we manage. The map key is the docker container ID
2014-07-02 18:21:29 +00:00
func (kl *Kubelet) getDockerContainers() (map[DockerID]docker.APIContainers, error) {
result := map[DockerID]docker.APIContainers{}
containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
2014-06-06 23:40:48 +00:00
if err != nil {
return nil, err
2014-06-06 23:40:48 +00:00
}
for _, value := range containerList {
// Skip containers that we didn't create to allow users to manually
// spin up their own containers if they want.
if !strings.HasPrefix(value.Names[0], "/"+containerNamePrefix+"--") {
continue
2014-06-06 23:40:48 +00:00
}
2014-07-02 18:21:29 +00:00
result[DockerID(value.ID)] = value
2014-06-06 23:40:48 +00:00
}
return result, nil
2014-06-06 23:40:48 +00:00
}
// Return Docker's container ID for a manifest's container. Returns an empty string if it doesn't exist.
2014-07-02 18:21:29 +00:00
func (kl *Kubelet) getContainerID(manifest *api.ContainerManifest, container *api.Container) (DockerID, error) {
dockerContainers, err := kl.getDockerContainers()
2014-06-06 23:40:48 +00:00
if err != nil {
return "", err
2014-06-06 23:40:48 +00:00
}
for id, dockerContainer := range dockerContainers {
2014-07-02 18:21:29 +00:00
manifestID, containerName := parseDockerName(dockerContainer.Names[0])
if manifestID == manifest.ID && containerName == container.Name {
return DockerID(id), nil
}
2014-06-06 23:40:48 +00:00
}
return "", nil
2014-06-06 23:40:48 +00:00
}
2014-07-03 05:35:50 +00:00
func (kl *Kubelet) getContainer(ID DockerID) (*docker.APIContainers, error) {
dockerContainers, err := kl.getDockerContainers()
if err != nil {
return nil, err
}
for dockerID, dockerContainer := range dockerContainers {
if dockerID == ID {
return &dockerContainer, nil
}
}
return nil, nil
}
func makeEnvironmentVariables(container *api.Container) []string {
var result []string
2014-06-06 23:40:48 +00:00
for _, value := range container.Env {
result = append(result, fmt.Sprintf("%s=%s", value.Name, value.Value))
2014-06-06 23:40:48 +00:00
}
return result
}
2014-06-06 23:40:48 +00:00
2014-07-10 12:26:24 +00:00
func makeVolumesAndBinds(manifestID string, container *api.Container) (map[string]struct{}, []string) {
2014-06-06 23:40:48 +00:00
volumes := map[string]struct{}{}
binds := []string{}
for _, volume := range container.VolumeMounts {
2014-06-19 23:59:48 +00:00
var basePath string
if volume.MountType == "HOST" {
// Host volumes are not Docker volumes and are directly mounted from the host.
basePath = fmt.Sprintf("%s:%s", volume.MountPath, volume.MountPath)
} else {
volumes[volume.MountPath] = struct{}{}
2014-07-10 12:26:24 +00:00
basePath = fmt.Sprintf("/exports/%s/%s:%s", manifestID, volume.Name, volume.MountPath)
2014-06-19 23:59:48 +00:00
}
2014-06-06 23:40:48 +00:00
if volume.ReadOnly {
basePath += ":ro"
}
binds = append(binds, basePath)
}
return volumes, binds
}
2014-06-06 23:40:48 +00:00
func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) {
2014-06-06 23:40:48 +00:00
exposedPorts := map[docker.Port]struct{}{}
portBindings := map[docker.Port][]docker.PortBinding{}
for _, port := range container.Ports {
interiorPort := port.ContainerPort
exteriorPort := port.HostPort
// Some of this port stuff is under-documented voodoo.
// See http://stackoverflow.com/questions/20428302/binding-a-port-to-a-host-interface-using-the-rest-api
2014-06-16 04:19:35 +00:00
var protocol string
switch strings.ToUpper(port.Protocol) {
case "UDP":
2014-06-16 04:19:35 +00:00
protocol = "/udp"
case "TCP":
2014-06-16 04:19:35 +00:00
protocol = "/tcp"
default:
glog.Infof("Unknown protocol '%s': defaulting to TCP", port.Protocol)
2014-06-16 04:19:35 +00:00
protocol = "/tcp"
}
dockerPort := docker.Port(strconv.Itoa(interiorPort) + protocol)
2014-06-06 23:40:48 +00:00
exposedPorts[dockerPort] = struct{}{}
portBindings[dockerPort] = []docker.PortBinding{
2014-06-12 21:09:40 +00:00
{
2014-06-06 23:40:48 +00:00
HostPort: strconv.Itoa(exteriorPort),
HostIp: port.HostIP,
2014-06-06 23:40:48 +00:00
},
}
}
return exposedPorts, portBindings
}
func milliCpuToShares(milliCpu int) int {
// Conceptually (milliCpu / milliCpuToCpu) * sharesPerCpu, but factored to improve rounding.
shares := (milliCpu * sharesPerCpu) / milliCpuToCpu
if shares < minShares {
return minShares
}
return shares
}
// Run a single container from a manifest. Returns the docker container ID
2014-07-02 18:21:29 +00:00
func (kl *Kubelet) runContainer(manifest *api.ContainerManifest, container *api.Container, netMode string) (id DockerID, err error) {
envVariables := makeEnvironmentVariables(container)
volumes, binds := makeVolumesAndBinds(manifest.ID, container)
exposedPorts, portBindings := makePortsAndBindings(container)
2014-06-06 23:40:48 +00:00
opts := docker.CreateContainerOptions{
Name: buildDockerName(manifest, container),
2014-06-06 23:40:48 +00:00
Config: &docker.Config{
Cmd: container.Command,
Env: envVariables,
ExposedPorts: exposedPorts,
Hostname: container.Name,
2014-06-06 23:40:48 +00:00
Image: container.Image,
Memory: int64(container.Memory),
CpuShares: int64(milliCpuToShares(container.CPU)),
2014-06-06 23:40:48 +00:00
Volumes: volumes,
WorkingDir: container.WorkingDir,
},
}
dockerContainer, err := kl.DockerClient.CreateContainer(opts)
2014-06-06 23:40:48 +00:00
if err != nil {
return "", err
}
err = kl.DockerClient.StartContainer(dockerContainer.ID, &docker.HostConfig{
2014-06-06 23:40:48 +00:00
PortBindings: portBindings,
Binds: binds,
NetworkMode: netMode,
2014-06-06 23:40:48 +00:00
})
2014-07-02 18:21:29 +00:00
return DockerID(dockerContainer.ID), err
2014-06-06 23:40:48 +00:00
}
// Kill a docker container
func (kl *Kubelet) killContainer(container docker.APIContainers) error {
err := kl.DockerClient.StopContainer(container.ID, 10)
2014-07-02 18:21:29 +00:00
manifestID, containerName := parseDockerName(container.Names[0])
kl.LogEvent(&api.Event{
2014-06-06 23:40:48 +00:00
Event: "STOP",
Manifest: &api.ContainerManifest{
2014-07-02 18:21:29 +00:00
ID: manifestID,
2014-06-06 23:40:48 +00:00
},
Container: &api.Container{
Name: containerName,
},
})
return err
}
2014-06-19 20:06:52 +00:00
func (kl *Kubelet) extractFromFile(name string) (api.ContainerManifest, error) {
var file *os.File
var err error
2014-06-19 20:06:52 +00:00
var manifest api.ContainerManifest
if file, err = os.Open(name); err != nil {
2014-06-19 20:06:52 +00:00
return manifest, err
}
defer file.Close()
2014-06-19 20:06:52 +00:00
data, err := ioutil.ReadAll(file)
if err != nil {
glog.Errorf("Couldn't read from file: %v", err)
2014-06-19 20:06:52 +00:00
return manifest, err
}
if err = kl.ExtractYAMLData(data, &manifest); err != nil {
return manifest, err
}
return manifest, nil
}
2014-06-19 20:06:52 +00:00
func (kl *Kubelet) extractFromDir(name string) ([]api.ContainerManifest, error) {
var manifests []api.ContainerManifest
files, err := filepath.Glob(filepath.Join(name, "[^.]*"))
if err != nil {
2014-06-19 20:06:52 +00:00
return manifests, err
}
2014-06-19 20:06:52 +00:00
2014-06-19 22:14:57 +00:00
sort.Strings(files)
2014-06-19 20:06:52 +00:00
for _, file := range files {
manifest, err := kl.extractFromFile(file)
if err != nil {
glog.Errorf("Couldn't read from file %s: %v", file, err)
2014-06-19 20:06:52 +00:00
return manifests, err
}
manifests = append(manifests, manifest)
}
2014-06-19 20:06:52 +00:00
return manifests, nil
2014-06-20 16:31:18 +00:00
}
2014-07-10 12:26:24 +00:00
// WatchFiles watches a file or direcory of files for changes to the set of pods that
// should run on this Kubelet.
2014-07-10 12:26:24 +00:00
func (kl *Kubelet) WatchFiles(configPath string, updateChannel chan<- manifestUpdate) {
var err error
2014-06-19 20:06:52 +00:00
2014-07-10 12:26:24 +00:00
statInfo, err := os.Stat(configPath)
if err != nil {
2014-06-24 00:58:21 +00:00
if !os.IsNotExist(err) {
glog.Errorf("Error accessing path: %v", err)
2014-06-24 00:58:21 +00:00
}
return
}
if statInfo.Mode().IsDir() {
2014-07-10 12:26:24 +00:00
manifests, err := kl.extractFromDir(configPath)
2014-06-06 23:40:48 +00:00
if err != nil {
glog.Errorf("Error polling dir: %v", err)
return
2014-06-19 20:06:52 +00:00
}
updateChannel <- manifestUpdate{fileSource, manifests}
} else if statInfo.Mode().IsRegular() {
2014-07-10 12:26:24 +00:00
manifest, err := kl.extractFromFile(configPath)
if err != nil {
glog.Errorf("Error polling file: %v", err)
return
2014-06-06 23:40:48 +00:00
}
updateChannel <- manifestUpdate{fileSource, []api.ContainerManifest{manifest}}
} else {
glog.Errorf("Error accessing config - not a directory or file")
return
2014-06-06 23:40:48 +00:00
}
}
func (kl *Kubelet) extractFromHTTP(url string, updateChannel chan<- manifestUpdate) error {
2014-06-06 23:40:48 +00:00
request, err := http.NewRequest("GET", url, nil)
if err != nil {
2014-06-20 16:31:18 +00:00
return err
2014-06-06 23:40:48 +00:00
}
response, err := http.DefaultClient.Do(request)
2014-06-06 23:40:48 +00:00
if err != nil {
2014-06-20 16:31:18 +00:00
return err
2014-06-06 23:40:48 +00:00
}
defer response.Body.Close()
data, err := ioutil.ReadAll(response.Body)
if err != nil {
return err
}
if len(data) == 0 {
return fmt.Errorf("zero-length data received from %v", url)
}
// First try as if it's a single manifest
var manifest api.ContainerManifest
singleErr := yaml.Unmarshal(data, &manifest)
if singleErr == nil && manifest.Version == "" {
// If data is a []ContainerManifest, trying to put it into a ContainerManifest
// will not give an error but also won't set any of the fields.
// Our docs say that the version field is mandatory, so using that to judge wether
// this was actually successful.
singleErr = fmt.Errorf("got blank version field")
}
if singleErr == nil {
updateChannel <- manifestUpdate{httpClientSource, []api.ContainerManifest{manifest}}
return nil
}
// That didn't work, so try an array of manifests.
var manifests []api.ContainerManifest
multiErr := yaml.Unmarshal(data, &manifests)
// We're not sure if the person reading the logs is going to care about the single or
// multiple manifest unmarshalling attempt, so we need to put both in the logs, as is
// done at the end. Hence not returning early here.
if multiErr == nil && len(manifests) > 0 && manifests[0].Version == "" {
multiErr = fmt.Errorf("got blank version field")
}
if multiErr == nil {
updateChannel <- manifestUpdate{httpClientSource, manifests}
return nil
}
return fmt.Errorf("%v: received '%v', but couldn't parse as a "+
"single manifest (%v: %+v) or as multiple manifests (%v: %+v).\n",
url, string(data), singleErr, manifest, multiErr, manifests)
}
2014-07-10 12:26:24 +00:00
// ResponseToManifests takes an etcd Response object, and turns it into a structured list of containers.
// It returns a list of containers, or an error if one occurs.
func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.ContainerManifest, error) {
2014-06-06 23:40:48 +00:00
if response.Node == nil || len(response.Node.Value) == 0 {
return nil, fmt.Errorf("no nodes field: %v", response)
2014-06-06 23:40:48 +00:00
}
var manifests []api.ContainerManifest
err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests)
2014-06-06 23:40:48 +00:00
return manifests, err
}
func (kl *Kubelet) getKubeletStateFromEtcd(key string, updateChannel chan<- manifestUpdate) error {
response, err := kl.EtcdClient.Get(key, true, false)
2014-06-06 23:40:48 +00:00
if err != nil {
if tools.IsEtcdNotFound(err) {
2014-06-24 00:58:21 +00:00
return nil
2014-06-06 23:40:48 +00:00
}
glog.Errorf("Error on etcd get of %s: %v", key, err)
2014-06-06 23:40:48 +00:00
return err
}
manifests, err := kl.ResponseToManifests(response)
2014-06-06 23:40:48 +00:00
if err != nil {
glog.Errorf("Error parsing response (%v): %s", response, err)
2014-06-06 23:40:48 +00:00
return err
}
glog.Infof("Got state from etcd: %+v", manifests)
updateChannel <- manifestUpdate{etcdSource, manifests}
2014-06-06 23:40:48 +00:00
return nil
}
2014-07-10 12:26:24 +00:00
// SyncAndSetupEtcdWatch synchronizes with etcd, and sets up an etcd watch for new configurations.
2014-06-06 23:40:48 +00:00
// The channel to send new configurations across
// This function loops forever and is intended to be run in a go routine.
func (kl *Kubelet) SyncAndSetupEtcdWatch(updateChannel chan<- manifestUpdate) {
key := path.Join("registry", "hosts", strings.TrimSpace(kl.Hostname), "kubelet")
2014-06-06 23:40:48 +00:00
// First fetch the initial configuration (watch only gives changes...)
for {
err := kl.getKubeletStateFromEtcd(key, updateChannel)
2014-06-06 23:40:48 +00:00
if err == nil {
// We got a successful response, etcd is up, set up the watch.
break
}
time.Sleep(30 * time.Second)
}
done := make(chan bool)
go util.Forever(func() { kl.TimeoutWatch(done) }, 0)
2014-06-06 23:40:48 +00:00
for {
// The etcd client will close the watch channel when it exits. So we need
// to create and service a new one every time.
watchChannel := make(chan *etcd.Response)
// We don't push this through Forever because if it dies, we just do it again in 30 secs.
// anyway.
go kl.WatchEtcd(watchChannel, updateChannel)
2014-06-06 23:40:48 +00:00
kl.getKubeletStateFromEtcd(key, updateChannel)
glog.V(1).Infof("Setting up a watch for configuration changes in etcd for %s", key)
kl.EtcdClient.Watch(key, 0, true, watchChannel, done)
2014-06-06 23:40:48 +00:00
}
}
2014-07-10 12:26:24 +00:00
// TimeoutWatch timeout the watch after 30 seconds.
func (kl *Kubelet) TimeoutWatch(done chan bool) {
2014-06-06 23:40:48 +00:00
t := time.Tick(30 * time.Second)
for _ = range t {
done <- true
}
}
2014-07-10 12:26:24 +00:00
// ExtractYAMLData extracts data from YAML file into a list of containers.
func (kl *Kubelet) ExtractYAMLData(buf []byte, output interface{}) error {
2014-06-06 23:40:48 +00:00
err := yaml.Unmarshal(buf, output)
if err != nil {
glog.Errorf("Couldn't unmarshal configuration: %v", err)
2014-06-06 23:40:48 +00:00
return err
}
return nil
}
func (kl *Kubelet) extractFromEtcd(response *etcd.Response) ([]api.ContainerManifest, error) {
var manifests []api.ContainerManifest
if response.Node == nil || len(response.Node.Value) == 0 {
return manifests, fmt.Errorf("no nodes field: %v", response)
}
err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests)
return manifests, err
}
2014-07-10 12:26:24 +00:00
// WatchEtcd watches etcd for changes, receives config objects from the etcd client watch.
// This function loops until the watchChannel is closed, and is intended to be run as a goroutine.
func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, updateChannel chan<- manifestUpdate) {
2014-06-06 23:40:48 +00:00
defer util.HandleCrash()
for {
watchResponse := <-watchChannel
// This means the channel has been closed.
if watchResponse == nil {
return
}
glog.Infof("Got etcd change: %v", watchResponse)
manifests, err := kl.extractFromEtcd(watchResponse)
if err != nil {
glog.Errorf("Error handling response from etcd: %v", err)
2014-06-06 23:40:48 +00:00
continue
}
glog.Infof("manifests: %+v", manifests)
2014-06-06 23:40:48 +00:00
// Ok, we have a valid configuration, send to channel for
// rejiggering.
updateChannel <- manifestUpdate{etcdSource, manifests}
2014-06-06 23:40:48 +00:00
}
}
const networkContainerName = "net"
// Return the docker ID for a manifest's network container. Returns an empty string if it doesn't exist.
2014-07-02 18:21:29 +00:00
func (kl *Kubelet) getNetworkContainerID(manifest *api.ContainerManifest) (DockerID, error) {
return kl.getContainerID(manifest, &api.Container{Name: networkContainerName})
}
// Create a network container for a manifest. Returns the docker container ID of the newly created container.
2014-07-02 18:21:29 +00:00
func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (DockerID, error) {
var ports []api.Port
// Docker only exports ports from the network container. Let's
// collect all of the relevant ports and export them.
for _, container := range manifest.Containers {
ports = append(ports, container.Ports...)
}
container := &api.Container{
Name: networkContainerName,
Image: "busybox",
Command: []string{"sh", "-c", "rm -f nap && mkfifo nap && exec cat nap"},
Ports: ports,
}
kl.DockerPuller.Pull("busybox")
return kl.runContainer(manifest, container, "")
}
2014-07-02 18:21:29 +00:00
func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel chan<- DockerID) error {
2014-07-01 05:27:56 +00:00
// Make sure we have a network container
2014-07-02 18:21:29 +00:00
netID, err := kl.getNetworkContainerID(manifest)
2014-07-01 05:27:56 +00:00
if err != nil {
2014-07-01 21:01:42 +00:00
glog.Errorf("Failed to introspect network container. (%v) Skipping manifest %s", err, manifest.ID)
2014-07-01 05:27:56 +00:00
return err
}
2014-07-02 18:21:29 +00:00
if netID == "" {
2014-07-01 05:27:56 +00:00
glog.Infof("Network container doesn't exist, creating")
2014-07-02 18:21:29 +00:00
netID, err = kl.createNetworkContainer(manifest)
if err != nil {
2014-07-01 21:01:42 +00:00
glog.Errorf("Failed to introspect network container. (%v) Skipping manifest %s", err, manifest.ID)
2014-07-01 05:27:56 +00:00
return err
}
}
2014-07-02 18:21:29 +00:00
keepChannel <- netID
2014-07-01 05:27:56 +00:00
for _, container := range manifest.Containers {
2014-07-02 18:21:29 +00:00
containerID, err := kl.getContainerID(manifest, &container)
2014-07-01 05:27:56 +00:00
if err != nil {
2014-07-01 21:01:42 +00:00
glog.Errorf("Error finding container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name)
continue
}
2014-07-02 18:21:29 +00:00
if containerID == "" {
2014-07-01 05:27:56 +00:00
glog.Infof("%+v doesn't exist, creating", container)
kl.DockerPuller.Pull(container.Image)
if err != nil {
2014-07-01 21:01:42 +00:00
glog.Errorf("Failed to create container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name)
continue
}
2014-07-02 18:21:29 +00:00
containerID, err = kl.runContainer(manifest, &container, "container:"+string(netID))
2014-06-06 23:40:48 +00:00
if err != nil {
2014-07-01 05:27:56 +00:00
// TODO(bburns) : Perhaps blacklist a container after N failures?
2014-07-01 21:01:42 +00:00
glog.Errorf("Error running manifest %s container %s: %v", manifest.ID, container.Name, err)
2014-06-06 23:40:48 +00:00
continue
}
2014-07-01 05:27:56 +00:00
} else {
2014-07-03 05:35:50 +00:00
glog.Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID)
2014-07-02 18:21:29 +00:00
glog.V(1).Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID)
2014-07-03 05:35:50 +00:00
dockerContainer, err := kl.getContainer(containerID)
// TODO: This should probably be separated out into a separate goroutine.
healthy, err := kl.healthy(container, dockerContainer)
if err != nil {
glog.V(1).Infof("health check errored: %v", err)
continue
}
2014-07-15 18:39:19 +00:00
if healthy != health.Healthy {
2014-07-03 05:35:50 +00:00
glog.V(1).Infof("manifest %s container %s is unhealthy.", manifest.ID, container.Name)
if err != nil {
glog.V(1).Infof("Failed to get container info %v, for %s", err, containerID)
continue
}
err = kl.killContainer(*dockerContainer)
if err != nil {
glog.V(1).Infof("Failed to kill container %s: %v", containerID, err)
continue
}
containerID, err = kl.runContainer(manifest, &container, "container:"+string(netID))
}
2014-07-01 05:27:56 +00:00
}
2014-07-02 18:21:29 +00:00
keepChannel <- containerID
2014-06-06 23:40:48 +00:00
}
2014-07-01 05:27:56 +00:00
return nil
}
2014-07-03 01:06:54 +00:00
type empty struct{}
2014-07-10 12:26:24 +00:00
// SyncManifests synchronizes the configured list of containers (desired state) with the host current state.
2014-07-01 05:27:56 +00:00
func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
glog.Infof("Desired: %+v", config)
var err error
2014-07-03 01:06:54 +00:00
dockerIdsToKeep := map[DockerID]empty{}
keepChannel := make(chan DockerID, defaultChanSize)
2014-07-01 05:27:56 +00:00
waitGroup := sync.WaitGroup{}
// Check for any containers that need starting
for ix := range config {
2014-07-01 05:27:56 +00:00
waitGroup.Add(1)
go func(index int) {
2014-07-01 16:37:45 +00:00
defer util.HandleCrash()
defer waitGroup.Done()
// necessary to dereference by index here b/c otherwise the shared value
// in the for each is re-used.
err := kl.syncManifest(&config[index], keepChannel)
2014-07-01 05:27:56 +00:00
if err != nil {
glog.Errorf("Error syncing manifest: %v skipping.", err)
}
}(ix)
2014-07-01 05:27:56 +00:00
}
2014-07-03 01:06:54 +00:00
ch := make(chan bool)
2014-07-01 16:37:45 +00:00
go func() {
for id := range keepChannel {
2014-07-03 01:06:54 +00:00
dockerIdsToKeep[id] = empty{}
2014-07-01 16:37:45 +00:00
}
2014-07-03 01:06:54 +00:00
ch <- true
2014-07-01 16:37:45 +00:00
}()
if len(config) > 0 {
waitGroup.Wait()
}
2014-07-03 01:06:54 +00:00
close(keepChannel)
<-ch
// Kill any containers we don't need
existingContainers, err := kl.getDockerContainers()
if err != nil {
glog.Errorf("Error listing containers: %v", err)
return err
}
for id, container := range existingContainers {
2014-07-03 01:06:54 +00:00
if _, ok := dockerIdsToKeep[id]; !ok {
glog.Infof("Killing: %s", id)
err = kl.killContainer(container)
2014-06-06 23:40:48 +00:00
if err != nil {
glog.Errorf("Error killing container: %v", err)
2014-06-06 23:40:48 +00:00
}
}
}
return err
}
// Check that all Port.HostPort values are unique across all manifests.
func checkHostPortConflicts(allManifests []api.ContainerManifest, newManifest *api.ContainerManifest) []error {
allErrs := []error{}
allPorts := map[int]bool{}
extract := func(p *api.Port) int { return p.HostPort }
for i := range allManifests {
manifest := &allManifests[i]
errs := api.AccumulateUniquePorts(manifest.Containers, allPorts, extract)
if len(errs) != 0 {
allErrs = append(allErrs, errs...)
}
}
if errs := api.AccumulateUniquePorts(newManifest.Containers, allPorts, extract); len(errs) != 0 {
allErrs = append(allErrs, errs...)
}
return allErrs
}
// syncLoop is the main loop for processing changes. It watches for changes from
2014-06-20 16:31:18 +00:00
// four channels (file, etcd, server, and http) and creates a union of them. For
2014-06-06 23:40:48 +00:00
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync_frequency seconds.
// Never returns.
func (kl *Kubelet) syncLoop(updateChannel <-chan manifestUpdate, handler SyncHandler) {
last := make(map[string][]api.ContainerManifest)
2014-06-06 23:40:48 +00:00
for {
select {
case u := <-updateChannel:
glog.Infof("Got configuration from %s: %+v", u.source, u.manifests)
last[u.source] = u.manifests
case <-time.After(kl.SyncFrequency):
2014-06-06 23:40:48 +00:00
}
allManifests := []api.ContainerManifest{}
allIds := util.StringSet{}
for src, srcManifests := range last {
for i := range srcManifests {
allErrs := []error{}
m := &srcManifests[i]
if allIds.Has(m.ID) {
allErrs = append(allErrs, api.ValidationError{api.ErrTypeDuplicate, "ContainerManifest.ID", m.ID})
} else {
allIds.Insert(m.ID)
}
if errs := api.ValidateManifest(m); len(errs) != 0 {
allErrs = append(allErrs, errs...)
}
// Check for host-wide HostPort conflicts.
if errs := checkHostPortConflicts(allManifests, m); len(errs) != 0 {
allErrs = append(allErrs, errs...)
}
if len(allErrs) > 0 {
glog.Warningf("Manifest from %s failed validation, ignoring: %v", src, allErrs)
}
}
// TODO(thockin): There's no reason to collect manifests by value. Don't pessimize.
allManifests = append(allManifests, srcManifests...)
}
2014-06-20 16:31:18 +00:00
err := handler.SyncManifests(allManifests)
2014-06-06 23:40:48 +00:00
if err != nil {
glog.Errorf("Couldn't sync containers : %v", err)
2014-06-06 23:40:48 +00:00
}
}
}
2014-07-10 12:26:24 +00:00
// getContainerIDFromName looks at the list of containers on the machine and returns the ID of the container whose name
// matches 'name'. It returns the name of the container, or empty string, if the container isn't found.
// it returns true if the container is found, false otherwise, and any error that occurs.
// TODO: This functions exists to support GetContainerInfo and GetContainerStats
// It should be removed once those two functions start taking proper pod.IDs
2014-07-10 12:26:24 +00:00
func (kl *Kubelet) getContainerIDFromName(name string) (DockerID, bool, error) {
containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
if err != nil {
return "", false, err
}
for _, value := range containerList {
if strings.Contains(value.Names[0], name) {
2014-07-02 18:21:29 +00:00
return DockerID(value.ID), true, nil
}
}
return "", false, nil
}
2014-07-10 12:26:24 +00:00
// GetPodInfo returns docker info for all containers in the pod/manifest.
func (kl *Kubelet) GetPodInfo(podID string) (api.PodInfo, error) {
info := api.PodInfo{}
containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
2014-06-06 23:40:48 +00:00
if err != nil {
return nil, err
2014-06-06 23:40:48 +00:00
}
for _, value := range containerList {
manifestID, containerName := parseDockerName(value.Names[0])
if manifestID != podID {
continue
}
inspectResult, err := kl.DockerClient.InspectContainer(value.ID)
if err != nil {
return nil, err
}
if inspectResult == nil {
// Why did we not get an error?
info[containerName] = docker.Container{}
} else {
info[containerName] = *inspectResult
}
}
return info, nil
2014-06-06 23:40:48 +00:00
}
2014-06-19 01:26:23 +00:00
2014-07-01 21:05:10 +00:00
// Returns the docker id corresponding to pod-id-container-name pair.
func (kl *Kubelet) getDockerIDFromPodIDAndContainerName(podID, containerName string) (DockerID, error) {
containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
if err != nil {
return "", err
2014-06-19 01:26:23 +00:00
}
2014-07-01 21:05:10 +00:00
for _, value := range containerList {
manifestID, cName := parseDockerName(value.Names[0])
if manifestID == podID && cName == containerName {
return DockerID(value.ID), nil
}
2014-06-19 01:26:23 +00:00
}
2014-07-01 21:05:10 +00:00
return "", errors.New("couldn't find container")
}
2014-06-19 01:26:23 +00:00
func getCadvisorContainerInfoRequest(req *info.ContainerInfoRequest) *info.ContainerInfoRequest {
ret := &info.ContainerInfoRequest{
NumStats: req.NumStats,
CpuUsagePercentiles: req.CpuUsagePercentiles,
MemoryUsagePercentages: req.MemoryUsagePercentages,
}
return ret
}
2014-07-01 21:05:10 +00:00
// This method takes a container's absolute path and returns the stats for the
// container. The container's absolute path refers to its hierarchy in the
// cgroup file system. e.g. The root container, which represents the whole
// machine, has path "/"; all docker containers have path "/docker/<docker id>"
func (kl *Kubelet) statsFromContainerPath(containerPath string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
cinfo, err := kl.CadvisorClient.ContainerInfo(containerPath, getCadvisorContainerInfoRequest(req))
2014-06-19 01:26:23 +00:00
if err != nil {
return nil, err
}
return cinfo, nil
2014-06-19 01:26:23 +00:00
}
2014-07-01 21:05:10 +00:00
2014-07-10 12:26:24 +00:00
// GetContainerStats returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
2014-07-01 21:05:10 +00:00
if kl.CadvisorClient == nil {
return nil, nil
}
dockerID, err := kl.getDockerIDFromPodIDAndContainerName(podID, containerName)
if err != nil || len(dockerID) == 0 {
return nil, err
}
return kl.statsFromContainerPath(fmt.Sprintf("/docker/%s", string(dockerID)), req)
2014-07-01 21:05:10 +00:00
}
2014-07-10 12:26:24 +00:00
// GetMachineStats returns stats (from Cadvisor) of current machine.
func (kl *Kubelet) GetMachineStats(req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
return kl.statsFromContainerPath("/", req)
2014-07-01 21:05:10 +00:00
}
2014-07-03 05:35:50 +00:00
2014-07-15 18:39:19 +00:00
func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIContainers) (health.Status, error) {
2014-07-03 05:35:50 +00:00
// Give the container 60 seconds to start up.
2014-07-09 23:53:31 +00:00
if container.LivenessProbe == nil {
2014-07-15 18:39:19 +00:00
return health.Healthy, nil
2014-07-03 05:35:50 +00:00
}
if time.Now().Unix()-dockerContainer.Created < container.LivenessProbe.InitialDelaySeconds {
2014-07-15 18:39:19 +00:00
return health.Healthy, nil
2014-07-03 05:35:50 +00:00
}
if kl.HealthChecker == nil {
2014-07-15 18:39:19 +00:00
return health.Healthy, nil
2014-07-03 05:35:50 +00:00
}
2014-07-11 17:02:59 +00:00
return kl.HealthChecker.HealthCheck(container)
2014-07-03 05:35:50 +00:00
}