2014-06-06 23:40:48 +00:00
|
|
|
/*
|
|
|
|
Copyright 2014 Google Inc. All rights reserved.
|
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package kubelet
|
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
2014-07-01 21:05:10 +00:00
|
|
|
"errors"
|
2014-06-06 23:40:48 +00:00
|
|
|
"fmt"
|
|
|
|
"io/ioutil"
|
2014-06-26 22:00:55 +00:00
|
|
|
"net"
|
2014-06-06 23:40:48 +00:00
|
|
|
"net/http"
|
2014-06-09 20:47:25 +00:00
|
|
|
"os"
|
2014-06-27 01:14:13 +00:00
|
|
|
"path"
|
2014-06-19 20:06:52 +00:00
|
|
|
"path/filepath"
|
2014-06-19 22:14:57 +00:00
|
|
|
"sort"
|
2014-06-06 23:40:48 +00:00
|
|
|
"strconv"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
2014-07-15 00:00:23 +00:00
|
|
|
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
|
2014-06-30 19:00:14 +00:00
|
|
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
2014-06-06 23:40:48 +00:00
|
|
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
|
|
|
"github.com/coreos/go-etcd/etcd"
|
|
|
|
"github.com/fsouza/go-dockerclient"
|
2014-06-25 03:51:57 +00:00
|
|
|
"github.com/golang/glog"
|
2014-07-08 23:37:43 +00:00
|
|
|
"github.com/google/cadvisor/client"
|
2014-06-19 00:31:18 +00:00
|
|
|
"github.com/google/cadvisor/info"
|
2014-06-06 23:40:48 +00:00
|
|
|
"gopkg.in/v1/yaml"
|
|
|
|
)
|
|
|
|
|
2014-07-14 20:12:44 +00:00
|
|
|
const defaultChanSize = 1024
|
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// CadvisorInterface is an abstract interface for testability. It abstracts the interface of "github.com/google/cadvisor/client".Client.
|
2014-06-19 00:31:18 +00:00
|
|
|
type CadvisorInterface interface {
|
|
|
|
ContainerInfo(name string) (*info.ContainerInfo, error)
|
|
|
|
MachineInfo() (*info.MachineInfo, error)
|
|
|
|
}
|
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// New creates a new Kubelet.
|
2014-07-01 16:15:49 +00:00
|
|
|
func New() *Kubelet {
|
|
|
|
return &Kubelet{}
|
|
|
|
}
|
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// Kubelet is the main kubelet implementation.
|
2014-06-06 23:40:48 +00:00
|
|
|
type Kubelet struct {
|
2014-06-12 01:57:15 +00:00
|
|
|
Hostname string
|
2014-06-30 19:00:14 +00:00
|
|
|
EtcdClient tools.EtcdClient
|
2014-06-06 23:40:48 +00:00
|
|
|
DockerClient DockerInterface
|
2014-06-24 23:31:33 +00:00
|
|
|
DockerPuller DockerPuller
|
2014-06-19 00:31:18 +00:00
|
|
|
CadvisorClient CadvisorInterface
|
2014-06-06 23:40:48 +00:00
|
|
|
FileCheckFrequency time.Duration
|
|
|
|
SyncFrequency time.Duration
|
|
|
|
HTTPCheckFrequency time.Duration
|
|
|
|
pullLock sync.Mutex
|
2014-07-03 05:35:50 +00:00
|
|
|
HealthChecker HealthChecker
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
|
2014-06-21 21:20:35 +00:00
|
|
|
type manifestUpdate struct {
|
|
|
|
source string
|
|
|
|
manifests []api.ContainerManifest
|
|
|
|
}
|
|
|
|
|
|
|
|
const (
|
|
|
|
fileSource = "file"
|
|
|
|
etcdSource = "etcd"
|
|
|
|
httpClientSource = "http_client"
|
|
|
|
httpServerSource = "http_server"
|
|
|
|
)
|
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// RunKubelet starts background goroutines. If config_path, manifest_url, or address are empty,
|
2014-06-06 23:40:48 +00:00
|
|
|
// they are not watched. Never returns.
|
2014-07-10 12:26:24 +00:00
|
|
|
func (kl *Kubelet) RunKubelet(dockerEndpoint, configPath, manifestURL, etcdServers, address string, port uint) {
|
2014-07-08 23:37:43 +00:00
|
|
|
if kl.CadvisorClient == nil {
|
|
|
|
var err error
|
|
|
|
kl.CadvisorClient, err = cadvisor.NewClient("http://127.0.0.1:5000")
|
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("Error on creating cadvisor client: %v", err)
|
|
|
|
}
|
|
|
|
}
|
2014-06-24 23:31:33 +00:00
|
|
|
if kl.DockerPuller == nil {
|
2014-07-15 15:34:48 +00:00
|
|
|
kl.DockerPuller = NewDockerPuller(kl.DockerClient)
|
2014-06-24 23:31:33 +00:00
|
|
|
}
|
2014-06-21 21:20:35 +00:00
|
|
|
updateChannel := make(chan manifestUpdate)
|
2014-07-10 12:26:24 +00:00
|
|
|
if configPath != "" {
|
|
|
|
glog.Infof("Watching for file configs at %s", configPath)
|
2014-06-24 18:15:32 +00:00
|
|
|
go util.Forever(func() {
|
2014-07-10 12:26:24 +00:00
|
|
|
kl.WatchFiles(configPath, updateChannel)
|
2014-06-24 18:15:32 +00:00
|
|
|
}, kl.FileCheckFrequency)
|
2014-06-20 16:31:18 +00:00
|
|
|
}
|
2014-07-10 12:26:24 +00:00
|
|
|
if manifestURL != "" {
|
|
|
|
glog.Infof("Watching for HTTP configs at %s", manifestURL)
|
2014-06-23 22:06:28 +00:00
|
|
|
go util.Forever(func() {
|
2014-07-10 12:26:24 +00:00
|
|
|
if err := kl.extractFromHTTP(manifestURL, updateChannel); err != nil {
|
2014-06-29 05:16:26 +00:00
|
|
|
glog.Errorf("Error syncing http: %v", err)
|
2014-06-23 22:06:28 +00:00
|
|
|
}
|
|
|
|
}, kl.HTTPCheckFrequency)
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
2014-07-10 12:26:24 +00:00
|
|
|
if etcdServers != "" {
|
|
|
|
servers := []string{etcdServers}
|
2014-06-25 03:51:57 +00:00
|
|
|
glog.Infof("Watching for etcd configs at %v", servers)
|
2014-06-24 20:17:38 +00:00
|
|
|
kl.EtcdClient = etcd.NewClient(servers)
|
2014-06-21 21:20:35 +00:00
|
|
|
go util.Forever(func() { kl.SyncAndSetupEtcdWatch(updateChannel) }, 20*time.Second)
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
if address != "" {
|
2014-06-25 03:51:57 +00:00
|
|
|
glog.Infof("Starting to listen on %s:%d", address, port)
|
2014-06-06 23:40:48 +00:00
|
|
|
handler := KubeletServer{
|
2014-07-15 00:00:23 +00:00
|
|
|
Kubelet: kl,
|
|
|
|
UpdateChannel: updateChannel,
|
|
|
|
DelegateHandler: http.DefaultServeMux,
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
s := &http.Server{
|
2014-06-26 22:00:55 +00:00
|
|
|
Addr: net.JoinHostPort(address, strconv.FormatUint(uint64(port), 10)),
|
2014-06-06 23:40:48 +00:00
|
|
|
Handler: &handler,
|
|
|
|
ReadTimeout: 10 * time.Second,
|
|
|
|
WriteTimeout: 10 * time.Second,
|
|
|
|
MaxHeaderBytes: 1 << 20,
|
|
|
|
}
|
|
|
|
go util.Forever(func() { s.ListenAndServe() }, 0)
|
|
|
|
}
|
2014-07-03 05:35:50 +00:00
|
|
|
kl.HealthChecker = MakeHealthChecker()
|
2014-07-01 20:01:39 +00:00
|
|
|
kl.syncLoop(updateChannel, kl)
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// SyncHandler is an interface implemented by Kubelet, for testability
|
2014-06-06 23:40:48 +00:00
|
|
|
type SyncHandler interface {
|
|
|
|
SyncManifests([]api.ContainerManifest) error
|
|
|
|
}
|
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// LogEvent logs an event to the etcd backend.
|
2014-06-09 03:35:07 +00:00
|
|
|
func (kl *Kubelet) LogEvent(event *api.Event) error {
|
2014-06-24 20:17:38 +00:00
|
|
|
if kl.EtcdClient == nil {
|
2014-07-10 12:26:24 +00:00
|
|
|
return fmt.Errorf("no etcd client connection")
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
event.Timestamp = time.Now().Unix()
|
|
|
|
data, err := json.Marshal(event)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
var response *etcd.Response
|
2014-06-24 20:17:38 +00:00
|
|
|
response, err = kl.EtcdClient.AddChild(fmt.Sprintf("/events/%s", event.Container.Name), string(data), 60*60*48 /* 2 days */)
|
2014-06-06 23:40:48 +00:00
|
|
|
// TODO(bburns) : examine response here.
|
|
|
|
if err != nil {
|
2014-06-25 03:51:57 +00:00
|
|
|
glog.Errorf("Error writing event: %s\n", err)
|
2014-06-06 23:40:48 +00:00
|
|
|
if response != nil {
|
2014-06-29 05:16:26 +00:00
|
|
|
glog.Infof("Response was: %v\n", *response)
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2014-06-25 23:24:20 +00:00
|
|
|
// Return a map of docker containers that we manage. The map key is the docker container ID
|
2014-07-02 18:21:29 +00:00
|
|
|
func (kl *Kubelet) getDockerContainers() (map[DockerID]docker.APIContainers, error) {
|
|
|
|
result := map[DockerID]docker.APIContainers{}
|
2014-06-09 03:35:07 +00:00
|
|
|
containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
|
2014-06-06 23:40:48 +00:00
|
|
|
if err != nil {
|
2014-07-12 18:03:08 +00:00
|
|
|
return nil, err
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
for _, value := range containerList {
|
2014-06-25 23:24:20 +00:00
|
|
|
// Skip containers that we didn't create to allow users to manually
|
|
|
|
// spin up their own containers if they want.
|
|
|
|
if !strings.HasPrefix(value.Names[0], "/"+containerNamePrefix+"--") {
|
|
|
|
continue
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
2014-07-02 18:21:29 +00:00
|
|
|
result[DockerID(value.ID)] = value
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
2014-07-12 18:03:08 +00:00
|
|
|
return result, nil
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
|
2014-06-27 19:09:53 +00:00
|
|
|
// Return Docker's container ID for a manifest's container. Returns an empty string if it doesn't exist.
|
2014-07-02 18:21:29 +00:00
|
|
|
func (kl *Kubelet) getContainerID(manifest *api.ContainerManifest, container *api.Container) (DockerID, error) {
|
2014-06-25 23:24:20 +00:00
|
|
|
dockerContainers, err := kl.getDockerContainers()
|
2014-06-06 23:40:48 +00:00
|
|
|
if err != nil {
|
2014-06-25 23:24:20 +00:00
|
|
|
return "", err
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
2014-06-25 23:24:20 +00:00
|
|
|
for id, dockerContainer := range dockerContainers {
|
2014-07-02 18:21:29 +00:00
|
|
|
manifestID, containerName := parseDockerName(dockerContainer.Names[0])
|
|
|
|
if manifestID == manifest.ID && containerName == container.Name {
|
|
|
|
return DockerID(id), nil
|
2014-06-25 23:24:20 +00:00
|
|
|
}
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
2014-06-25 23:24:20 +00:00
|
|
|
return "", nil
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
|
2014-07-03 05:35:50 +00:00
|
|
|
func (kl *Kubelet) getContainer(ID DockerID) (*docker.APIContainers, error) {
|
|
|
|
dockerContainers, err := kl.getDockerContainers()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
for dockerID, dockerContainer := range dockerContainers {
|
|
|
|
if dockerID == ID {
|
|
|
|
return &dockerContainer, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
2014-06-09 20:47:25 +00:00
|
|
|
func makeEnvironmentVariables(container *api.Container) []string {
|
|
|
|
var result []string
|
2014-06-06 23:40:48 +00:00
|
|
|
for _, value := range container.Env {
|
2014-06-09 20:47:25 +00:00
|
|
|
result = append(result, fmt.Sprintf("%s=%s", value.Name, value.Value))
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
2014-06-09 20:47:25 +00:00
|
|
|
return result
|
|
|
|
}
|
2014-06-06 23:40:48 +00:00
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
func makeVolumesAndBinds(manifestID string, container *api.Container) (map[string]struct{}, []string) {
|
2014-06-06 23:40:48 +00:00
|
|
|
volumes := map[string]struct{}{}
|
|
|
|
binds := []string{}
|
|
|
|
for _, volume := range container.VolumeMounts {
|
2014-06-19 23:59:48 +00:00
|
|
|
var basePath string
|
|
|
|
if volume.MountType == "HOST" {
|
|
|
|
// Host volumes are not Docker volumes and are directly mounted from the host.
|
|
|
|
basePath = fmt.Sprintf("%s:%s", volume.MountPath, volume.MountPath)
|
|
|
|
} else {
|
|
|
|
volumes[volume.MountPath] = struct{}{}
|
2014-07-10 12:26:24 +00:00
|
|
|
basePath = fmt.Sprintf("/exports/%s/%s:%s", manifestID, volume.Name, volume.MountPath)
|
2014-06-19 23:59:48 +00:00
|
|
|
}
|
2014-06-06 23:40:48 +00:00
|
|
|
if volume.ReadOnly {
|
|
|
|
basePath += ":ro"
|
|
|
|
}
|
|
|
|
binds = append(binds, basePath)
|
|
|
|
}
|
2014-06-09 20:47:25 +00:00
|
|
|
return volumes, binds
|
|
|
|
}
|
2014-06-06 23:40:48 +00:00
|
|
|
|
2014-06-09 20:47:25 +00:00
|
|
|
func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) {
|
2014-06-06 23:40:48 +00:00
|
|
|
exposedPorts := map[docker.Port]struct{}{}
|
|
|
|
portBindings := map[docker.Port][]docker.PortBinding{}
|
|
|
|
for _, port := range container.Ports {
|
|
|
|
interiorPort := port.ContainerPort
|
|
|
|
exteriorPort := port.HostPort
|
|
|
|
// Some of this port stuff is under-documented voodoo.
|
|
|
|
// See http://stackoverflow.com/questions/20428302/binding-a-port-to-a-host-interface-using-the-rest-api
|
2014-06-16 04:19:35 +00:00
|
|
|
var protocol string
|
2014-07-08 04:32:56 +00:00
|
|
|
switch strings.ToUpper(port.Protocol) {
|
|
|
|
case "UDP":
|
2014-06-16 04:19:35 +00:00
|
|
|
protocol = "/udp"
|
2014-07-08 04:32:56 +00:00
|
|
|
case "TCP":
|
2014-06-16 04:19:35 +00:00
|
|
|
protocol = "/tcp"
|
|
|
|
default:
|
2014-07-08 04:32:56 +00:00
|
|
|
glog.Infof("Unknown protocol '%s': defaulting to TCP", port.Protocol)
|
2014-06-16 04:19:35 +00:00
|
|
|
protocol = "/tcp"
|
|
|
|
}
|
|
|
|
dockerPort := docker.Port(strconv.Itoa(interiorPort) + protocol)
|
2014-06-06 23:40:48 +00:00
|
|
|
exposedPorts[dockerPort] = struct{}{}
|
|
|
|
portBindings[dockerPort] = []docker.PortBinding{
|
2014-06-12 21:09:40 +00:00
|
|
|
{
|
2014-06-06 23:40:48 +00:00
|
|
|
HostPort: strconv.Itoa(exteriorPort),
|
2014-07-09 05:44:15 +00:00
|
|
|
HostIp: port.HostIP,
|
2014-06-06 23:40:48 +00:00
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
2014-06-09 20:47:25 +00:00
|
|
|
return exposedPorts, portBindings
|
|
|
|
}
|
|
|
|
|
2014-06-25 23:24:20 +00:00
|
|
|
// Run a single container from a manifest. Returns the docker container ID
|
2014-07-02 18:21:29 +00:00
|
|
|
func (kl *Kubelet) runContainer(manifest *api.ContainerManifest, container *api.Container, netMode string) (id DockerID, err error) {
|
2014-06-09 20:47:25 +00:00
|
|
|
envVariables := makeEnvironmentVariables(container)
|
2014-07-09 18:39:03 +00:00
|
|
|
volumes, binds := makeVolumesAndBinds(manifest.ID, container)
|
2014-06-09 20:47:25 +00:00
|
|
|
exposedPorts, portBindings := makePortsAndBindings(container)
|
|
|
|
|
2014-06-06 23:40:48 +00:00
|
|
|
opts := docker.CreateContainerOptions{
|
2014-06-27 19:09:53 +00:00
|
|
|
Name: buildDockerName(manifest, container),
|
2014-06-06 23:40:48 +00:00
|
|
|
Config: &docker.Config{
|
2014-07-15 03:56:18 +00:00
|
|
|
Cmd: container.Command,
|
|
|
|
Env: envVariables,
|
|
|
|
ExposedPorts: exposedPorts,
|
2014-07-10 19:25:57 +00:00
|
|
|
Hostname: container.Name,
|
2014-06-06 23:40:48 +00:00
|
|
|
Image: container.Image,
|
2014-07-15 03:56:18 +00:00
|
|
|
Memory: int64(container.Memory),
|
2014-06-06 23:40:48 +00:00
|
|
|
Volumes: volumes,
|
|
|
|
WorkingDir: container.WorkingDir,
|
|
|
|
},
|
|
|
|
}
|
2014-06-09 03:35:07 +00:00
|
|
|
dockerContainer, err := kl.DockerClient.CreateContainer(opts)
|
2014-06-06 23:40:48 +00:00
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
2014-06-25 23:24:20 +00:00
|
|
|
err = kl.DockerClient.StartContainer(dockerContainer.ID, &docker.HostConfig{
|
2014-06-06 23:40:48 +00:00
|
|
|
PortBindings: portBindings,
|
|
|
|
Binds: binds,
|
2014-06-20 03:30:42 +00:00
|
|
|
NetworkMode: netMode,
|
2014-06-06 23:40:48 +00:00
|
|
|
})
|
2014-07-02 18:21:29 +00:00
|
|
|
return DockerID(dockerContainer.ID), err
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
|
2014-06-25 23:24:20 +00:00
|
|
|
// Kill a docker container
|
|
|
|
func (kl *Kubelet) killContainer(container docker.APIContainers) error {
|
|
|
|
err := kl.DockerClient.StopContainer(container.ID, 10)
|
2014-07-02 18:21:29 +00:00
|
|
|
manifestID, containerName := parseDockerName(container.Names[0])
|
2014-06-09 03:35:07 +00:00
|
|
|
kl.LogEvent(&api.Event{
|
2014-06-06 23:40:48 +00:00
|
|
|
Event: "STOP",
|
|
|
|
Manifest: &api.ContainerManifest{
|
2014-07-02 18:21:29 +00:00
|
|
|
ID: manifestID,
|
2014-06-06 23:40:48 +00:00
|
|
|
},
|
|
|
|
Container: &api.Container{
|
|
|
|
Name: containerName,
|
|
|
|
},
|
|
|
|
})
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2014-06-19 20:06:52 +00:00
|
|
|
func (kl *Kubelet) extractFromFile(name string) (api.ContainerManifest, error) {
|
2014-06-09 20:47:25 +00:00
|
|
|
var file *os.File
|
|
|
|
var err error
|
2014-06-19 20:06:52 +00:00
|
|
|
var manifest api.ContainerManifest
|
|
|
|
|
2014-06-09 20:47:25 +00:00
|
|
|
if file, err = os.Open(name); err != nil {
|
2014-06-19 20:06:52 +00:00
|
|
|
return manifest, err
|
2014-06-09 20:47:25 +00:00
|
|
|
}
|
2014-06-19 21:59:52 +00:00
|
|
|
defer file.Close()
|
2014-06-09 20:47:25 +00:00
|
|
|
|
2014-06-19 20:06:52 +00:00
|
|
|
data, err := ioutil.ReadAll(file)
|
|
|
|
if err != nil {
|
2014-06-25 03:51:57 +00:00
|
|
|
glog.Errorf("Couldn't read from file: %v", err)
|
2014-06-19 20:06:52 +00:00
|
|
|
return manifest, err
|
|
|
|
}
|
|
|
|
if err = kl.ExtractYAMLData(data, &manifest); err != nil {
|
|
|
|
return manifest, err
|
|
|
|
}
|
|
|
|
return manifest, nil
|
2014-06-09 20:47:25 +00:00
|
|
|
}
|
|
|
|
|
2014-06-19 20:06:52 +00:00
|
|
|
func (kl *Kubelet) extractFromDir(name string) ([]api.ContainerManifest, error) {
|
|
|
|
var manifests []api.ContainerManifest
|
|
|
|
|
2014-06-24 19:02:18 +00:00
|
|
|
files, err := filepath.Glob(filepath.Join(name, "[^.]*"))
|
2014-06-09 20:47:25 +00:00
|
|
|
if err != nil {
|
2014-06-19 20:06:52 +00:00
|
|
|
return manifests, err
|
2014-06-09 20:47:25 +00:00
|
|
|
}
|
2014-06-19 20:06:52 +00:00
|
|
|
|
2014-06-19 22:14:57 +00:00
|
|
|
sort.Strings(files)
|
|
|
|
|
2014-06-19 20:06:52 +00:00
|
|
|
for _, file := range files {
|
|
|
|
manifest, err := kl.extractFromFile(file)
|
|
|
|
if err != nil {
|
2014-06-25 03:51:57 +00:00
|
|
|
glog.Errorf("Couldn't read from file %s: %v", file, err)
|
2014-06-19 20:06:52 +00:00
|
|
|
return manifests, err
|
|
|
|
}
|
|
|
|
manifests = append(manifests, manifest)
|
2014-06-09 20:47:25 +00:00
|
|
|
}
|
2014-06-19 20:06:52 +00:00
|
|
|
return manifests, nil
|
2014-06-20 16:31:18 +00:00
|
|
|
}
|
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// WatchFiles watches a file or direcory of files for changes to the set of pods that
|
2014-06-24 04:14:15 +00:00
|
|
|
// should run on this Kubelet.
|
2014-07-10 12:26:24 +00:00
|
|
|
func (kl *Kubelet) WatchFiles(configPath string, updateChannel chan<- manifestUpdate) {
|
2014-06-23 22:06:28 +00:00
|
|
|
var err error
|
2014-06-19 20:06:52 +00:00
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
statInfo, err := os.Stat(configPath)
|
2014-06-23 22:06:28 +00:00
|
|
|
if err != nil {
|
2014-06-24 00:58:21 +00:00
|
|
|
if !os.IsNotExist(err) {
|
2014-06-29 05:16:26 +00:00
|
|
|
glog.Errorf("Error accessing path: %v", err)
|
2014-06-24 00:58:21 +00:00
|
|
|
}
|
2014-06-23 22:06:28 +00:00
|
|
|
return
|
|
|
|
}
|
2014-06-24 04:14:15 +00:00
|
|
|
if statInfo.Mode().IsDir() {
|
2014-07-10 12:26:24 +00:00
|
|
|
manifests, err := kl.extractFromDir(configPath)
|
2014-06-06 23:40:48 +00:00
|
|
|
if err != nil {
|
2014-06-29 05:16:26 +00:00
|
|
|
glog.Errorf("Error polling dir: %v", err)
|
2014-06-23 22:06:28 +00:00
|
|
|
return
|
2014-06-19 20:06:52 +00:00
|
|
|
}
|
2014-06-23 22:06:28 +00:00
|
|
|
updateChannel <- manifestUpdate{fileSource, manifests}
|
2014-06-24 04:14:15 +00:00
|
|
|
} else if statInfo.Mode().IsRegular() {
|
2014-07-10 12:26:24 +00:00
|
|
|
manifest, err := kl.extractFromFile(configPath)
|
2014-06-23 22:06:28 +00:00
|
|
|
if err != nil {
|
2014-06-29 05:16:26 +00:00
|
|
|
glog.Errorf("Error polling file: %v", err)
|
2014-06-23 22:06:28 +00:00
|
|
|
return
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
2014-06-23 22:06:28 +00:00
|
|
|
updateChannel <- manifestUpdate{fileSource, []api.ContainerManifest{manifest}}
|
2014-06-24 04:14:15 +00:00
|
|
|
} else {
|
2014-06-25 03:51:57 +00:00
|
|
|
glog.Errorf("Error accessing config - not a directory or file")
|
2014-06-24 04:14:15 +00:00
|
|
|
return
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-06-21 21:20:35 +00:00
|
|
|
func (kl *Kubelet) extractFromHTTP(url string, updateChannel chan<- manifestUpdate) error {
|
2014-06-06 23:40:48 +00:00
|
|
|
request, err := http.NewRequest("GET", url, nil)
|
|
|
|
if err != nil {
|
2014-06-20 16:31:18 +00:00
|
|
|
return err
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
2014-06-23 22:06:28 +00:00
|
|
|
response, err := http.DefaultClient.Do(request)
|
2014-06-06 23:40:48 +00:00
|
|
|
if err != nil {
|
2014-06-20 16:31:18 +00:00
|
|
|
return err
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
defer response.Body.Close()
|
2014-06-23 22:47:25 +00:00
|
|
|
data, err := ioutil.ReadAll(response.Body)
|
2014-06-21 21:20:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2014-06-23 22:47:25 +00:00
|
|
|
if len(data) == 0 {
|
|
|
|
return fmt.Errorf("zero-length data received from %v", url)
|
|
|
|
}
|
|
|
|
|
|
|
|
// First try as if it's a single manifest
|
|
|
|
var manifest api.ContainerManifest
|
|
|
|
singleErr := yaml.Unmarshal(data, &manifest)
|
|
|
|
if singleErr == nil && manifest.Version == "" {
|
2014-06-24 17:19:47 +00:00
|
|
|
// If data is a []ContainerManifest, trying to put it into a ContainerManifest
|
|
|
|
// will not give an error but also won't set any of the fields.
|
|
|
|
// Our docs say that the version field is mandatory, so using that to judge wether
|
|
|
|
// this was actually successful.
|
2014-06-23 22:47:25 +00:00
|
|
|
singleErr = fmt.Errorf("got blank version field")
|
|
|
|
}
|
|
|
|
if singleErr == nil {
|
|
|
|
updateChannel <- manifestUpdate{httpClientSource, []api.ContainerManifest{manifest}}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// That didn't work, so try an array of manifests.
|
|
|
|
var manifests []api.ContainerManifest
|
|
|
|
multiErr := yaml.Unmarshal(data, &manifests)
|
2014-06-24 17:19:47 +00:00
|
|
|
// We're not sure if the person reading the logs is going to care about the single or
|
|
|
|
// multiple manifest unmarshalling attempt, so we need to put both in the logs, as is
|
|
|
|
// done at the end. Hence not returning early here.
|
2014-06-28 06:46:02 +00:00
|
|
|
if multiErr == nil && len(manifests) > 0 && manifests[0].Version == "" {
|
2014-06-23 22:47:25 +00:00
|
|
|
multiErr = fmt.Errorf("got blank version field")
|
|
|
|
}
|
|
|
|
if multiErr == nil {
|
|
|
|
updateChannel <- manifestUpdate{httpClientSource, manifests}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return fmt.Errorf("%v: received '%v', but couldn't parse as a "+
|
2014-06-29 05:16:26 +00:00
|
|
|
"single manifest (%v: %+v) or as multiple manifests (%v: %+v).\n",
|
2014-06-23 22:47:25 +00:00
|
|
|
url, string(data), singleErr, manifest, multiErr, manifests)
|
2014-06-09 20:47:25 +00:00
|
|
|
}
|
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// ResponseToManifests takes an etcd Response object, and turns it into a structured list of containers.
|
|
|
|
// It returns a list of containers, or an error if one occurs.
|
2014-06-09 03:35:07 +00:00
|
|
|
func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.ContainerManifest, error) {
|
2014-06-06 23:40:48 +00:00
|
|
|
if response.Node == nil || len(response.Node.Value) == 0 {
|
2014-06-29 05:16:26 +00:00
|
|
|
return nil, fmt.Errorf("no nodes field: %v", response)
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
var manifests []api.ContainerManifest
|
2014-06-09 03:35:07 +00:00
|
|
|
err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests)
|
2014-06-06 23:40:48 +00:00
|
|
|
return manifests, err
|
|
|
|
}
|
|
|
|
|
2014-06-21 21:20:35 +00:00
|
|
|
func (kl *Kubelet) getKubeletStateFromEtcd(key string, updateChannel chan<- manifestUpdate) error {
|
2014-06-27 01:14:13 +00:00
|
|
|
response, err := kl.EtcdClient.Get(key, true, false)
|
2014-06-06 23:40:48 +00:00
|
|
|
if err != nil {
|
2014-06-30 19:00:14 +00:00
|
|
|
if tools.IsEtcdNotFound(err) {
|
2014-06-24 00:58:21 +00:00
|
|
|
return nil
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
2014-06-29 05:16:26 +00:00
|
|
|
glog.Errorf("Error on etcd get of %s: %v", key, err)
|
2014-06-06 23:40:48 +00:00
|
|
|
return err
|
|
|
|
}
|
2014-06-09 03:35:07 +00:00
|
|
|
manifests, err := kl.ResponseToManifests(response)
|
2014-06-06 23:40:48 +00:00
|
|
|
if err != nil {
|
2014-06-29 05:16:26 +00:00
|
|
|
glog.Errorf("Error parsing response (%v): %s", response, err)
|
2014-06-06 23:40:48 +00:00
|
|
|
return err
|
|
|
|
}
|
2014-06-25 03:51:57 +00:00
|
|
|
glog.Infof("Got state from etcd: %+v", manifests)
|
2014-06-21 21:20:35 +00:00
|
|
|
updateChannel <- manifestUpdate{etcdSource, manifests}
|
2014-06-06 23:40:48 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// SyncAndSetupEtcdWatch synchronizes with etcd, and sets up an etcd watch for new configurations.
|
2014-06-06 23:40:48 +00:00
|
|
|
// The channel to send new configurations across
|
|
|
|
// This function loops forever and is intended to be run in a go routine.
|
2014-06-21 21:20:35 +00:00
|
|
|
func (kl *Kubelet) SyncAndSetupEtcdWatch(updateChannel chan<- manifestUpdate) {
|
2014-06-27 01:14:13 +00:00
|
|
|
key := path.Join("registry", "hosts", strings.TrimSpace(kl.Hostname), "kubelet")
|
|
|
|
|
2014-06-06 23:40:48 +00:00
|
|
|
// First fetch the initial configuration (watch only gives changes...)
|
|
|
|
for {
|
2014-06-21 21:20:35 +00:00
|
|
|
err := kl.getKubeletStateFromEtcd(key, updateChannel)
|
2014-06-06 23:40:48 +00:00
|
|
|
if err == nil {
|
|
|
|
// We got a successful response, etcd is up, set up the watch.
|
|
|
|
break
|
|
|
|
}
|
|
|
|
time.Sleep(30 * time.Second)
|
|
|
|
}
|
|
|
|
|
|
|
|
done := make(chan bool)
|
2014-06-09 03:35:07 +00:00
|
|
|
go util.Forever(func() { kl.TimeoutWatch(done) }, 0)
|
2014-06-06 23:40:48 +00:00
|
|
|
for {
|
|
|
|
// The etcd client will close the watch channel when it exits. So we need
|
|
|
|
// to create and service a new one every time.
|
|
|
|
watchChannel := make(chan *etcd.Response)
|
|
|
|
// We don't push this through Forever because if it dies, we just do it again in 30 secs.
|
|
|
|
// anyway.
|
2014-06-21 21:20:35 +00:00
|
|
|
go kl.WatchEtcd(watchChannel, updateChannel)
|
2014-06-06 23:40:48 +00:00
|
|
|
|
2014-06-21 21:20:35 +00:00
|
|
|
kl.getKubeletStateFromEtcd(key, updateChannel)
|
2014-06-25 05:05:25 +00:00
|
|
|
glog.V(1).Infof("Setting up a watch for configuration changes in etcd for %s", key)
|
2014-06-24 20:17:38 +00:00
|
|
|
kl.EtcdClient.Watch(key, 0, true, watchChannel, done)
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// TimeoutWatch timeout the watch after 30 seconds.
|
2014-06-09 03:35:07 +00:00
|
|
|
func (kl *Kubelet) TimeoutWatch(done chan bool) {
|
2014-06-06 23:40:48 +00:00
|
|
|
t := time.Tick(30 * time.Second)
|
|
|
|
for _ = range t {
|
|
|
|
done <- true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// ExtractYAMLData extracts data from YAML file into a list of containers.
|
2014-06-09 03:35:07 +00:00
|
|
|
func (kl *Kubelet) ExtractYAMLData(buf []byte, output interface{}) error {
|
2014-06-06 23:40:48 +00:00
|
|
|
err := yaml.Unmarshal(buf, output)
|
|
|
|
if err != nil {
|
2014-06-25 03:51:57 +00:00
|
|
|
glog.Errorf("Couldn't unmarshal configuration: %v", err)
|
2014-06-06 23:40:48 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2014-06-09 20:47:25 +00:00
|
|
|
func (kl *Kubelet) extractFromEtcd(response *etcd.Response) ([]api.ContainerManifest, error) {
|
|
|
|
var manifests []api.ContainerManifest
|
|
|
|
if response.Node == nil || len(response.Node.Value) == 0 {
|
2014-06-29 05:16:26 +00:00
|
|
|
return manifests, fmt.Errorf("no nodes field: %v", response)
|
2014-06-09 20:47:25 +00:00
|
|
|
}
|
|
|
|
err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests)
|
|
|
|
return manifests, err
|
|
|
|
}
|
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// WatchEtcd watches etcd for changes, receives config objects from the etcd client watch.
|
2014-06-09 20:47:25 +00:00
|
|
|
// This function loops until the watchChannel is closed, and is intended to be run as a goroutine.
|
2014-06-21 21:20:35 +00:00
|
|
|
func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, updateChannel chan<- manifestUpdate) {
|
2014-06-06 23:40:48 +00:00
|
|
|
defer util.HandleCrash()
|
|
|
|
for {
|
|
|
|
watchResponse := <-watchChannel
|
|
|
|
// This means the channel has been closed.
|
|
|
|
if watchResponse == nil {
|
|
|
|
return
|
|
|
|
}
|
2014-06-29 05:16:26 +00:00
|
|
|
glog.Infof("Got etcd change: %v", watchResponse)
|
2014-06-09 20:47:25 +00:00
|
|
|
manifests, err := kl.extractFromEtcd(watchResponse)
|
|
|
|
if err != nil {
|
2014-06-29 05:16:26 +00:00
|
|
|
glog.Errorf("Error handling response from etcd: %v", err)
|
2014-06-06 23:40:48 +00:00
|
|
|
continue
|
|
|
|
}
|
2014-06-29 05:16:26 +00:00
|
|
|
glog.Infof("manifests: %+v", manifests)
|
2014-06-06 23:40:48 +00:00
|
|
|
// Ok, we have a valid configuration, send to channel for
|
|
|
|
// rejiggering.
|
2014-06-21 21:20:35 +00:00
|
|
|
updateChannel <- manifestUpdate{etcdSource, manifests}
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-06-21 20:16:20 +00:00
|
|
|
const networkContainerName = "net"
|
2014-06-20 03:30:42 +00:00
|
|
|
|
2014-06-27 19:09:53 +00:00
|
|
|
// Return the docker ID for a manifest's network container. Returns an empty string if it doesn't exist.
|
2014-07-02 18:21:29 +00:00
|
|
|
func (kl *Kubelet) getNetworkContainerID(manifest *api.ContainerManifest) (DockerID, error) {
|
|
|
|
return kl.getContainerID(manifest, &api.Container{Name: networkContainerName})
|
2014-06-20 03:30:42 +00:00
|
|
|
}
|
|
|
|
|
2014-06-27 19:09:53 +00:00
|
|
|
// Create a network container for a manifest. Returns the docker container ID of the newly created container.
|
2014-07-02 18:21:29 +00:00
|
|
|
func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (DockerID, error) {
|
2014-06-20 03:30:42 +00:00
|
|
|
var ports []api.Port
|
|
|
|
// Docker only exports ports from the network container. Let's
|
2014-06-20 15:55:02 +00:00
|
|
|
// collect all of the relevant ports and export them.
|
2014-06-20 03:30:42 +00:00
|
|
|
for _, container := range manifest.Containers {
|
|
|
|
ports = append(ports, container.Ports...)
|
|
|
|
}
|
|
|
|
container := &api.Container{
|
|
|
|
Name: networkContainerName,
|
|
|
|
Image: "busybox",
|
|
|
|
Command: []string{"sh", "-c", "rm -f nap && mkfifo nap && exec cat nap"},
|
|
|
|
Ports: ports,
|
|
|
|
}
|
2014-06-24 23:31:33 +00:00
|
|
|
kl.DockerPuller.Pull("busybox")
|
2014-06-25 23:24:20 +00:00
|
|
|
return kl.runContainer(manifest, container, "")
|
2014-06-20 03:30:42 +00:00
|
|
|
}
|
|
|
|
|
2014-07-02 18:21:29 +00:00
|
|
|
func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel chan<- DockerID) error {
|
2014-07-01 05:27:56 +00:00
|
|
|
// Make sure we have a network container
|
2014-07-02 18:21:29 +00:00
|
|
|
netID, err := kl.getNetworkContainerID(manifest)
|
2014-07-01 05:27:56 +00:00
|
|
|
if err != nil {
|
2014-07-01 21:01:42 +00:00
|
|
|
glog.Errorf("Failed to introspect network container. (%v) Skipping manifest %s", err, manifest.ID)
|
2014-07-01 05:27:56 +00:00
|
|
|
return err
|
|
|
|
}
|
2014-07-02 18:21:29 +00:00
|
|
|
if netID == "" {
|
2014-07-01 05:27:56 +00:00
|
|
|
glog.Infof("Network container doesn't exist, creating")
|
2014-07-02 18:21:29 +00:00
|
|
|
netID, err = kl.createNetworkContainer(manifest)
|
2014-06-20 03:30:42 +00:00
|
|
|
if err != nil {
|
2014-07-01 21:01:42 +00:00
|
|
|
glog.Errorf("Failed to introspect network container. (%v) Skipping manifest %s", err, manifest.ID)
|
2014-07-01 05:27:56 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2014-07-02 18:21:29 +00:00
|
|
|
keepChannel <- netID
|
2014-07-01 05:27:56 +00:00
|
|
|
for _, container := range manifest.Containers {
|
2014-07-02 18:21:29 +00:00
|
|
|
containerID, err := kl.getContainerID(manifest, &container)
|
2014-07-01 05:27:56 +00:00
|
|
|
if err != nil {
|
2014-07-01 21:01:42 +00:00
|
|
|
glog.Errorf("Error finding container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name)
|
2014-06-20 03:30:42 +00:00
|
|
|
continue
|
|
|
|
}
|
2014-07-02 18:21:29 +00:00
|
|
|
if containerID == "" {
|
2014-07-01 05:27:56 +00:00
|
|
|
glog.Infof("%+v doesn't exist, creating", container)
|
|
|
|
kl.DockerPuller.Pull(container.Image)
|
2014-06-20 03:30:42 +00:00
|
|
|
if err != nil {
|
2014-07-01 21:01:42 +00:00
|
|
|
glog.Errorf("Failed to create container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name)
|
2014-06-25 23:24:20 +00:00
|
|
|
continue
|
2014-06-20 03:30:42 +00:00
|
|
|
}
|
2014-07-02 18:21:29 +00:00
|
|
|
containerID, err = kl.runContainer(manifest, &container, "container:"+string(netID))
|
2014-06-06 23:40:48 +00:00
|
|
|
if err != nil {
|
2014-07-01 05:27:56 +00:00
|
|
|
// TODO(bburns) : Perhaps blacklist a container after N failures?
|
2014-07-01 21:01:42 +00:00
|
|
|
glog.Errorf("Error running manifest %s container %s: %v", manifest.ID, container.Name, err)
|
2014-06-06 23:40:48 +00:00
|
|
|
continue
|
|
|
|
}
|
2014-07-01 05:27:56 +00:00
|
|
|
} else {
|
2014-07-03 05:35:50 +00:00
|
|
|
glog.Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID)
|
2014-07-02 18:21:29 +00:00
|
|
|
glog.V(1).Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID)
|
2014-07-03 05:35:50 +00:00
|
|
|
dockerContainer, err := kl.getContainer(containerID)
|
|
|
|
// TODO: This should probably be separated out into a separate goroutine.
|
|
|
|
healthy, err := kl.healthy(container, dockerContainer)
|
|
|
|
if err != nil {
|
|
|
|
glog.V(1).Infof("health check errored: %v", err)
|
|
|
|
continue
|
|
|
|
}
|
2014-07-11 17:02:59 +00:00
|
|
|
if healthy != CheckHealthy {
|
2014-07-03 05:35:50 +00:00
|
|
|
glog.V(1).Infof("manifest %s container %s is unhealthy.", manifest.ID, container.Name)
|
|
|
|
if err != nil {
|
|
|
|
glog.V(1).Infof("Failed to get container info %v, for %s", err, containerID)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
err = kl.killContainer(*dockerContainer)
|
|
|
|
if err != nil {
|
|
|
|
glog.V(1).Infof("Failed to kill container %s: %v", containerID, err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
containerID, err = kl.runContainer(manifest, &container, "container:"+string(netID))
|
|
|
|
}
|
2014-07-01 05:27:56 +00:00
|
|
|
}
|
2014-07-02 18:21:29 +00:00
|
|
|
keepChannel <- containerID
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
2014-07-01 05:27:56 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2014-07-03 01:06:54 +00:00
|
|
|
type empty struct{}
|
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// SyncManifests synchronizes the configured list of containers (desired state) with the host current state.
|
2014-07-01 05:27:56 +00:00
|
|
|
func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
|
|
|
|
glog.Infof("Desired: %+v", config)
|
|
|
|
var err error
|
2014-07-03 01:06:54 +00:00
|
|
|
dockerIdsToKeep := map[DockerID]empty{}
|
2014-07-14 20:12:44 +00:00
|
|
|
keepChannel := make(chan DockerID, defaultChanSize)
|
2014-07-01 05:27:56 +00:00
|
|
|
waitGroup := sync.WaitGroup{}
|
|
|
|
|
|
|
|
// Check for any containers that need starting
|
2014-07-03 05:46:03 +00:00
|
|
|
for ix := range config {
|
2014-07-01 05:27:56 +00:00
|
|
|
waitGroup.Add(1)
|
2014-07-03 05:46:03 +00:00
|
|
|
go func(index int) {
|
2014-07-01 16:37:45 +00:00
|
|
|
defer util.HandleCrash()
|
|
|
|
defer waitGroup.Done()
|
2014-07-03 05:46:03 +00:00
|
|
|
// necessary to dereference by index here b/c otherwise the shared value
|
|
|
|
// in the for each is re-used.
|
|
|
|
err := kl.syncManifest(&config[index], keepChannel)
|
2014-07-01 05:27:56 +00:00
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("Error syncing manifest: %v skipping.", err)
|
|
|
|
}
|
2014-07-03 05:46:03 +00:00
|
|
|
}(ix)
|
2014-07-01 05:27:56 +00:00
|
|
|
}
|
2014-07-03 01:06:54 +00:00
|
|
|
ch := make(chan bool)
|
2014-07-01 16:37:45 +00:00
|
|
|
go func() {
|
|
|
|
for id := range keepChannel {
|
2014-07-03 01:06:54 +00:00
|
|
|
dockerIdsToKeep[id] = empty{}
|
2014-07-01 16:37:45 +00:00
|
|
|
}
|
2014-07-03 01:06:54 +00:00
|
|
|
ch <- true
|
2014-07-01 16:37:45 +00:00
|
|
|
}()
|
|
|
|
if len(config) > 0 {
|
|
|
|
waitGroup.Wait()
|
|
|
|
}
|
2014-07-03 01:06:54 +00:00
|
|
|
close(keepChannel)
|
|
|
|
<-ch
|
2014-06-25 23:24:20 +00:00
|
|
|
|
|
|
|
// Kill any containers we don't need
|
|
|
|
existingContainers, err := kl.getDockerContainers()
|
|
|
|
if err != nil {
|
2014-06-29 05:16:26 +00:00
|
|
|
glog.Errorf("Error listing containers: %v", err)
|
2014-06-25 23:24:20 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
for id, container := range existingContainers {
|
2014-07-03 01:06:54 +00:00
|
|
|
if _, ok := dockerIdsToKeep[id]; !ok {
|
2014-06-25 23:24:20 +00:00
|
|
|
glog.Infof("Killing: %s", id)
|
|
|
|
err = kl.killContainer(container)
|
2014-06-06 23:40:48 +00:00
|
|
|
if err != nil {
|
2014-06-29 05:16:26 +00:00
|
|
|
glog.Errorf("Error killing container: %v", err)
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2014-07-08 04:48:47 +00:00
|
|
|
// Check that all Port.HostPort values are unique across all manifests.
|
2014-07-08 06:20:30 +00:00
|
|
|
func checkHostPortConflicts(allManifests []api.ContainerManifest, newManifest *api.ContainerManifest) []error {
|
|
|
|
allErrs := []error{}
|
|
|
|
|
2014-07-08 04:48:47 +00:00
|
|
|
allPorts := map[int]bool{}
|
|
|
|
extract := func(p *api.Port) int { return p.HostPort }
|
|
|
|
for i := range allManifests {
|
|
|
|
manifest := &allManifests[i]
|
2014-07-08 06:20:30 +00:00
|
|
|
errs := api.AccumulateUniquePorts(manifest.Containers, allPorts, extract)
|
|
|
|
if len(errs) != 0 {
|
|
|
|
allErrs = append(allErrs, errs...)
|
2014-07-08 04:48:47 +00:00
|
|
|
}
|
|
|
|
}
|
2014-07-08 06:20:30 +00:00
|
|
|
if errs := api.AccumulateUniquePorts(newManifest.Containers, allPorts, extract); len(errs) != 0 {
|
|
|
|
allErrs = append(allErrs, errs...)
|
|
|
|
}
|
|
|
|
return allErrs
|
2014-07-08 04:48:47 +00:00
|
|
|
}
|
|
|
|
|
2014-07-01 20:01:39 +00:00
|
|
|
// syncLoop is the main loop for processing changes. It watches for changes from
|
2014-06-20 16:31:18 +00:00
|
|
|
// four channels (file, etcd, server, and http) and creates a union of them. For
|
2014-06-06 23:40:48 +00:00
|
|
|
// any new change seen, will run a sync against desired state and running state. If
|
|
|
|
// no changes are seen to the configuration, will synchronize the last known desired
|
|
|
|
// state every sync_frequency seconds.
|
|
|
|
// Never returns.
|
2014-07-01 20:01:39 +00:00
|
|
|
func (kl *Kubelet) syncLoop(updateChannel <-chan manifestUpdate, handler SyncHandler) {
|
2014-06-21 21:20:35 +00:00
|
|
|
last := make(map[string][]api.ContainerManifest)
|
2014-06-06 23:40:48 +00:00
|
|
|
for {
|
|
|
|
select {
|
2014-06-21 21:20:35 +00:00
|
|
|
case u := <-updateChannel:
|
2014-06-29 05:16:26 +00:00
|
|
|
glog.Infof("Got configuration from %s: %+v", u.source, u.manifests)
|
2014-06-21 21:20:35 +00:00
|
|
|
last[u.source] = u.manifests
|
2014-06-09 03:35:07 +00:00
|
|
|
case <-time.After(kl.SyncFrequency):
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
|
2014-07-01 20:01:39 +00:00
|
|
|
allManifests := []api.ContainerManifest{}
|
2014-07-08 04:48:47 +00:00
|
|
|
allIds := util.StringSet{}
|
2014-07-01 20:01:39 +00:00
|
|
|
for src, srcManifests := range last {
|
|
|
|
for i := range srcManifests {
|
2014-07-08 06:20:30 +00:00
|
|
|
allErrs := []error{}
|
|
|
|
|
2014-07-01 20:01:39 +00:00
|
|
|
m := &srcManifests[i]
|
2014-07-08 04:48:47 +00:00
|
|
|
if allIds.Has(m.ID) {
|
2014-07-08 06:20:30 +00:00
|
|
|
allErrs = append(allErrs, api.ValidationError{api.ErrTypeDuplicate, "ContainerManifest.ID", m.ID})
|
|
|
|
} else {
|
|
|
|
allIds.Insert(m.ID)
|
2014-07-08 04:48:47 +00:00
|
|
|
}
|
2014-07-08 06:20:30 +00:00
|
|
|
if errs := api.ValidateManifest(m); len(errs) != 0 {
|
|
|
|
allErrs = append(allErrs, errs...)
|
2014-07-01 20:01:39 +00:00
|
|
|
}
|
2014-07-08 04:48:47 +00:00
|
|
|
// Check for host-wide HostPort conflicts.
|
2014-07-08 06:20:30 +00:00
|
|
|
if errs := checkHostPortConflicts(allManifests, m); len(errs) != 0 {
|
|
|
|
allErrs = append(allErrs, errs...)
|
|
|
|
}
|
|
|
|
if len(allErrs) > 0 {
|
|
|
|
glog.Warningf("Manifest from %s failed validation, ignoring: %v", src, allErrs)
|
2014-07-08 04:48:47 +00:00
|
|
|
}
|
2014-07-01 20:01:39 +00:00
|
|
|
}
|
2014-07-08 04:48:47 +00:00
|
|
|
// TODO(thockin): There's no reason to collect manifests by value. Don't pessimize.
|
2014-07-01 20:01:39 +00:00
|
|
|
allManifests = append(allManifests, srcManifests...)
|
2014-06-21 21:20:35 +00:00
|
|
|
}
|
2014-06-20 16:31:18 +00:00
|
|
|
|
2014-07-01 20:01:39 +00:00
|
|
|
err := handler.SyncManifests(allManifests)
|
2014-06-06 23:40:48 +00:00
|
|
|
if err != nil {
|
2014-06-29 05:16:26 +00:00
|
|
|
glog.Errorf("Couldn't sync containers : %v", err)
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// getContainerIDFromName looks at the list of containers on the machine and returns the ID of the container whose name
|
2014-06-25 23:24:20 +00:00
|
|
|
// matches 'name'. It returns the name of the container, or empty string, if the container isn't found.
|
|
|
|
// it returns true if the container is found, false otherwise, and any error that occurs.
|
|
|
|
// TODO: This functions exists to support GetContainerInfo and GetContainerStats
|
|
|
|
// It should be removed once those two functions start taking proper pod.IDs
|
2014-07-10 12:26:24 +00:00
|
|
|
func (kl *Kubelet) getContainerIDFromName(name string) (DockerID, bool, error) {
|
2014-06-25 23:24:20 +00:00
|
|
|
containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
|
|
|
|
if err != nil {
|
|
|
|
return "", false, err
|
|
|
|
}
|
|
|
|
for _, value := range containerList {
|
|
|
|
if strings.Contains(value.Names[0], name) {
|
2014-07-02 18:21:29 +00:00
|
|
|
return DockerID(value.ID), true, nil
|
2014-06-25 23:24:20 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return "", false, nil
|
|
|
|
}
|
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// GetPodInfo returns docker info for all containers in the pod/manifest.
|
2014-07-01 22:35:56 +00:00
|
|
|
func (kl *Kubelet) GetPodInfo(podID string) (api.PodInfo, error) {
|
|
|
|
info := api.PodInfo{}
|
|
|
|
|
|
|
|
containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
|
2014-06-06 23:40:48 +00:00
|
|
|
if err != nil {
|
2014-07-01 02:46:10 +00:00
|
|
|
return nil, err
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
2014-07-01 22:35:56 +00:00
|
|
|
|
|
|
|
for _, value := range containerList {
|
|
|
|
manifestID, containerName := parseDockerName(value.Names[0])
|
|
|
|
if manifestID != podID {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
inspectResult, err := kl.DockerClient.InspectContainer(value.ID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2014-07-02 00:24:17 +00:00
|
|
|
if inspectResult == nil {
|
|
|
|
// Why did we not get an error?
|
|
|
|
info[containerName] = docker.Container{}
|
|
|
|
} else {
|
|
|
|
info[containerName] = *inspectResult
|
|
|
|
}
|
2014-07-01 22:35:56 +00:00
|
|
|
}
|
2014-07-01 02:46:10 +00:00
|
|
|
return info, nil
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
2014-06-19 01:26:23 +00:00
|
|
|
|
2014-07-01 21:05:10 +00:00
|
|
|
// Returns the docker id corresponding to pod-id-container-name pair.
|
|
|
|
func (kl *Kubelet) getDockerIDFromPodIDAndContainerName(podID, containerName string) (DockerID, error) {
|
|
|
|
containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
2014-06-19 01:26:23 +00:00
|
|
|
}
|
2014-07-01 21:05:10 +00:00
|
|
|
for _, value := range containerList {
|
|
|
|
manifestID, cName := parseDockerName(value.Names[0])
|
|
|
|
if manifestID == podID && cName == containerName {
|
|
|
|
return DockerID(value.ID), nil
|
|
|
|
}
|
2014-06-19 01:26:23 +00:00
|
|
|
}
|
2014-07-01 21:05:10 +00:00
|
|
|
return "", errors.New("couldn't find container")
|
|
|
|
}
|
2014-06-19 01:26:23 +00:00
|
|
|
|
2014-07-01 21:05:10 +00:00
|
|
|
// This method takes a container's absolute path and returns the stats for the
|
|
|
|
// container. The container's absolute path refers to its hierarchy in the
|
|
|
|
// cgroup file system. e.g. The root container, which represents the whole
|
|
|
|
// machine, has path "/"; all docker containers have path "/docker/<docker id>"
|
|
|
|
func (kl *Kubelet) statsFromContainerPath(containerPath string) (*api.ContainerStats, error) {
|
|
|
|
info, err := kl.CadvisorClient.ContainerInfo(containerPath)
|
2014-06-19 01:26:23 +00:00
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2014-06-20 04:06:28 +00:00
|
|
|
// When the stats data for the container is not available yet.
|
2014-06-19 01:26:23 +00:00
|
|
|
if info.StatsPercentiles == nil {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
ret := new(api.ContainerStats)
|
|
|
|
ret.MaxMemoryUsage = info.StatsPercentiles.MaxMemoryUsage
|
|
|
|
if len(info.StatsPercentiles.CpuUsagePercentiles) > 0 {
|
|
|
|
percentiles := make([]api.Percentile, len(info.StatsPercentiles.CpuUsagePercentiles))
|
|
|
|
for i, p := range info.StatsPercentiles.CpuUsagePercentiles {
|
|
|
|
percentiles[i].Percentage = p.Percentage
|
|
|
|
percentiles[i].Value = p.Value
|
|
|
|
}
|
|
|
|
ret.CpuUsagePercentiles = percentiles
|
|
|
|
}
|
|
|
|
if len(info.StatsPercentiles.MemoryUsagePercentiles) > 0 {
|
|
|
|
percentiles := make([]api.Percentile, len(info.StatsPercentiles.MemoryUsagePercentiles))
|
|
|
|
for i, p := range info.StatsPercentiles.MemoryUsagePercentiles {
|
|
|
|
percentiles[i].Percentage = p.Percentage
|
|
|
|
percentiles[i].Value = p.Value
|
|
|
|
}
|
|
|
|
ret.MemoryUsagePercentiles = percentiles
|
|
|
|
}
|
|
|
|
return ret, nil
|
|
|
|
}
|
2014-07-01 21:05:10 +00:00
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// GetContainerStats returns stats (from Cadvisor) for a container.
|
2014-07-01 21:05:10 +00:00
|
|
|
func (kl *Kubelet) GetContainerStats(podID, containerName string) (*api.ContainerStats, error) {
|
|
|
|
if kl.CadvisorClient == nil {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
dockerID, err := kl.getDockerIDFromPodIDAndContainerName(podID, containerName)
|
|
|
|
if err != nil || len(dockerID) == 0 {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return kl.statsFromContainerPath(fmt.Sprintf("/docker/%s", string(dockerID)))
|
|
|
|
}
|
|
|
|
|
2014-07-10 12:26:24 +00:00
|
|
|
// GetMachineStats returns stats (from Cadvisor) of current machine.
|
2014-07-01 21:05:10 +00:00
|
|
|
func (kl *Kubelet) GetMachineStats() (*api.ContainerStats, error) {
|
|
|
|
return kl.statsFromContainerPath("/")
|
|
|
|
}
|
2014-07-03 05:35:50 +00:00
|
|
|
|
2014-07-11 17:02:59 +00:00
|
|
|
func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIContainers) (HealthCheckStatus, error) {
|
2014-07-03 05:35:50 +00:00
|
|
|
// Give the container 60 seconds to start up.
|
2014-07-09 23:53:31 +00:00
|
|
|
if container.LivenessProbe == nil {
|
2014-07-11 17:02:59 +00:00
|
|
|
return CheckHealthy, nil
|
2014-07-03 05:35:50 +00:00
|
|
|
}
|
|
|
|
if time.Now().Unix()-dockerContainer.Created < container.LivenessProbe.InitialDelaySeconds {
|
2014-07-11 17:02:59 +00:00
|
|
|
return CheckHealthy, nil
|
2014-07-03 05:35:50 +00:00
|
|
|
}
|
|
|
|
if kl.HealthChecker == nil {
|
2014-07-11 17:02:59 +00:00
|
|
|
return CheckHealthy, nil
|
2014-07-03 05:35:50 +00:00
|
|
|
}
|
2014-07-11 17:02:59 +00:00
|
|
|
return kl.HealthChecker.HealthCheck(container)
|
2014-07-03 05:35:50 +00:00
|
|
|
}
|