2014-06-06 23:40:48 +00:00
/ *
Copyright 2014 Google Inc . All rights reserved .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package kubelet
import (
2015-03-06 07:56:30 +00:00
"errors"
2014-06-06 23:40:48 +00:00
"fmt"
2014-09-24 21:27:10 +00:00
"io"
2014-11-12 05:21:40 +00:00
"io/ioutil"
"net"
2014-06-06 23:40:48 +00:00
"net/http"
2014-11-07 06:41:16 +00:00
"os"
2014-07-29 17:20:50 +00:00
"path"
2014-10-28 00:29:55 +00:00
"sort"
2014-06-06 23:40:48 +00:00
"strconv"
"strings"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
2015-02-23 21:04:45 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
2014-09-01 05:10:49 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
2014-09-16 14:04:12 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
2015-01-07 16:17:30 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
2015-01-08 15:25:14 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
2014-11-04 00:16:31 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
2015-03-06 07:56:30 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
2014-09-09 04:33:17 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
2015-01-08 15:25:14 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars"
2015-02-24 00:22:12 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
2014-11-23 15:47:25 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
2015-01-08 15:25:14 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
2015-01-25 03:22:18 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
2015-02-27 18:44:44 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
2015-01-14 23:22:21 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
2014-06-06 23:40:48 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
2015-03-06 07:56:30 +00:00
utilErrors "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
2015-02-27 18:44:44 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
2014-06-06 23:40:48 +00:00
"github.com/fsouza/go-dockerclient"
2014-06-25 03:51:57 +00:00
"github.com/golang/glog"
2015-03-06 07:56:30 +00:00
cadvisorApi "github.com/google/cadvisor/info/v1"
2014-06-06 23:40:48 +00:00
)
2015-03-06 00:37:08 +00:00
const (
2015-02-23 21:04:45 +00:00
// Taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc
2015-03-06 00:37:08 +00:00
minShares = 2
sharesPerCPU = 1024
milliCPUToCPU = 1000
2014-06-24 23:31:33 +00:00
2015-03-06 00:37:08 +00:00
// The oom_score_adj of the POD infrastructure container. The default is 0, so
// any value below that makes it *less* likely to get OOM killed.
podOomScoreAdj = - 100
// Max amount of time to wait for the Docker daemon to come up.
maxWaitForDocker = 5 * time . Minute
2015-02-23 21:04:45 +00:00
// Initial node status update frequency and incremental frequency, for faster cluster startup.
// The update frequency will be increameted linearly, until it reaches status_update_frequency.
initialNodeStatusUpdateFrequency = 100 * time . Millisecond
nodeStatusUpdateFrequencyInc = 500 * time . Millisecond
// The retry count for updating node status at each sync period.
nodeStatusUpdateRetry = 5
2015-03-06 00:37:08 +00:00
)
2015-02-20 03:17:44 +00:00
2015-03-06 07:56:30 +00:00
var (
// ErrNoKubeletContainers returned when there are not containers managed by the kubelet (ie: either no containers on the node, or none that the kubelet cares about).
ErrNoKubeletContainers = errors . New ( "no containers managed by kubelet" )
// ErrContainerNotFound returned when a container in the given pod with the given container name was not found, amongst those managed by the kubelet.
ErrContainerNotFound = errors . New ( "no matching container" )
)
2014-07-15 20:24:41 +00:00
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
2015-02-24 23:29:18 +00:00
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occuring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
SyncPods ( pods [ ] api . BoundPod , podSyncTypes map [ types . UID ] metrics . SyncPodType , startTime time . Time ) error
2014-07-15 20:24:41 +00:00
}
2015-03-05 18:49:36 +00:00
type SourcesReadyFn func ( ) bool
2014-12-17 05:11:27 +00:00
2014-07-15 20:24:41 +00:00
type volumeMap map [ string ] volume . Interface
2014-07-22 21:40:59 +00:00
// New creates a new Kubelet for use in main
func NewMainKubelet (
2015-01-07 02:31:40 +00:00
hostname string ,
dockerClient dockertools . DockerInterface ,
2015-02-27 18:44:44 +00:00
kubeClient client . Interface ,
2015-01-07 02:31:40 +00:00
rootDirectory string ,
2015-01-21 00:59:26 +00:00
podInfraContainerImage string ,
2015-01-07 02:31:40 +00:00
resyncInterval time . Duration ,
2014-09-26 04:24:44 +00:00
pullQPS float32 ,
2014-10-28 00:29:55 +00:00
pullBurst int ,
minimumGCAge time . Duration ,
2014-12-17 05:11:27 +00:00
maxContainerCount int ,
2015-03-05 18:49:36 +00:00
sourcesReady SourcesReadyFn ,
2014-11-12 05:21:40 +00:00
clusterDomain string ,
2015-01-08 15:25:14 +00:00
clusterDNS net . IP ,
2014-11-23 15:47:25 +00:00
masterServiceNamespace string ,
2015-01-08 20:41:38 +00:00
volumePlugins [ ] volume . Plugin ,
2015-03-03 06:06:20 +00:00
streamingConnectionIdleTimeout time . Duration ,
2015-03-06 07:56:30 +00:00
recorder record . EventRecorder ,
2015-02-23 21:04:45 +00:00
cadvisorInterface cadvisor . Interface ,
statusUpdateFrequency time . Duration ) ( * Kubelet , error ) {
2015-01-12 00:42:11 +00:00
if rootDirectory == "" {
return nil , fmt . Errorf ( "invalid root directory %q" , rootDirectory )
}
2015-01-07 02:31:40 +00:00
if resyncInterval <= 0 {
return nil , fmt . Errorf ( "invalid sync frequency %d" , resyncInterval )
}
if minimumGCAge <= 0 {
return nil , fmt . Errorf ( "invalid minimum GC age %d" , minimumGCAge )
}
2015-03-06 00:37:08 +00:00
// Wait for the Docker daemon to be up (with a timeout).
waitStart := time . Now ( )
dockerUp := false
for time . Since ( waitStart ) < maxWaitForDocker {
_ , err := dockerClient . Version ( )
if err == nil {
dockerUp = true
break
}
time . Sleep ( 100 * time . Millisecond )
}
if ! dockerUp {
return nil , fmt . Errorf ( "timed out waiting for Docker to come up" )
}
2015-01-08 15:25:14 +00:00
2015-01-26 21:44:53 +00:00
serviceStore := cache . NewStore ( cache . MetaNamespaceKeyFunc )
2015-01-16 21:39:31 +00:00
if kubeClient != nil {
2015-02-27 18:44:44 +00:00
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
listWatch := & cache . ListWatch {
ListFunc : func ( ) ( runtime . Object , error ) {
return kubeClient . Services ( api . NamespaceAll ) . List ( labels . Everything ( ) )
} ,
WatchFunc : func ( resourceVersion string ) ( watch . Interface , error ) {
return kubeClient . Services ( api . NamespaceAll ) . Watch ( labels . Everything ( ) , labels . Everything ( ) , resourceVersion )
} ,
}
cache . NewReflector ( listWatch , & api . Service { } , serviceStore , 0 ) . Run ( )
2015-01-16 21:39:31 +00:00
}
2015-01-08 15:25:14 +00:00
serviceLister := & cache . StoreToServiceLister { serviceStore }
2015-02-26 18:40:36 +00:00
dockerClient = metrics . NewInstrumentedDockerInterface ( dockerClient )
2015-01-12 00:42:11 +00:00
klet := & Kubelet {
2015-01-08 20:41:38 +00:00
hostname : hostname ,
dockerClient : dockerClient ,
kubeClient : kubeClient ,
rootDirectory : rootDirectory ,
2015-02-23 21:04:45 +00:00
statusUpdateFrequency : statusUpdateFrequency ,
2015-01-08 20:41:38 +00:00
resyncInterval : resyncInterval ,
podInfraContainerImage : podInfraContainerImage ,
dockerIDToRef : map [ dockertools . DockerID ] * api . ObjectReference { } ,
runner : dockertools . NewDockerContainerCommandRunner ( dockerClient ) ,
httpClient : & http . Client { } ,
pullQPS : pullQPS ,
pullBurst : pullBurst ,
minimumGCAge : minimumGCAge ,
maxContainerCount : maxContainerCount ,
2015-03-05 18:49:36 +00:00
sourcesReady : sourcesReady ,
2015-01-08 20:41:38 +00:00
clusterDomain : clusterDomain ,
clusterDNS : clusterDNS ,
serviceLister : serviceLister ,
masterServiceNamespace : masterServiceNamespace ,
prober : newProbeHolder ( ) ,
readiness : newReadinessStates ( ) ,
streamingConnectionIdleTimeout : streamingConnectionIdleTimeout ,
2015-03-03 06:06:20 +00:00
recorder : recorder ,
2015-03-06 07:56:30 +00:00
cadvisor : cadvisorInterface ,
2015-01-12 00:42:11 +00:00
}
2015-02-19 09:12:53 +00:00
dockerCache , err := dockertools . NewDockerCache ( dockerClient )
if err != nil {
return nil , err
}
klet . dockerCache = dockerCache
2015-03-03 06:06:20 +00:00
klet . podWorkers = newPodWorkers ( dockerCache , klet . syncPod , recorder )
2015-02-19 09:12:53 +00:00
2015-02-24 18:08:32 +00:00
metrics . Register ( dockerCache )
2015-02-19 09:12:53 +00:00
if err = klet . setupDataDirs ( ) ; err != nil {
2015-01-12 00:42:11 +00:00
return nil , err
}
2015-02-19 09:12:53 +00:00
if err = klet . volumePluginMgr . InitPlugins ( volumePlugins , & volumeHost { klet } ) ; err != nil {
2014-11-23 15:47:25 +00:00
return nil , err
}
2015-01-12 00:42:11 +00:00
2015-03-03 18:33:25 +00:00
klet . podStatuses = make ( map [ string ] api . PodStatus )
2015-01-12 00:42:11 +00:00
return klet , nil
2014-07-22 21:40:59 +00:00
}
2014-10-20 03:15:23 +00:00
type httpGetter interface {
2014-09-03 20:39:56 +00:00
Get ( url string ) ( * http . Response , error )
}
2015-01-08 15:25:14 +00:00
type serviceLister interface {
List ( ) ( api . ServiceList , error )
}
2014-07-10 12:26:24 +00:00
// Kubelet is the main kubelet implementation.
2014-06-06 23:40:48 +00:00
type Kubelet struct {
2015-01-21 00:59:26 +00:00
hostname string
dockerClient dockertools . DockerInterface
2015-02-19 09:12:53 +00:00
dockerCache dockertools . DockerCache
2015-02-27 18:44:44 +00:00
kubeClient client . Interface
2015-01-21 00:59:26 +00:00
rootDirectory string
podInfraContainerImage string
podWorkers * podWorkers
2015-02-23 21:04:45 +00:00
statusUpdateFrequency time . Duration
2015-01-21 00:59:26 +00:00
resyncInterval time . Duration
2015-03-05 18:49:36 +00:00
sourcesReady SourcesReadyFn
2014-07-15 20:24:41 +00:00
2015-02-17 18:53:04 +00:00
// Protects the pods array
// We make complete array copies out of this while locked, which is OK because once added to this array,
// pods are immutable
podLock sync . RWMutex
pods [ ] api . BoundPod
2014-11-04 00:16:31 +00:00
// Needed to report events for containers belonging to deleted/modified pods.
// Tracks references for reporting events
dockerIDToRef map [ dockertools . DockerID ] * api . ObjectReference
refLock sync . RWMutex
2014-07-15 20:24:41 +00:00
// Optional, defaults to simple Docker implementation
2014-09-09 04:33:17 +00:00
dockerPuller dockertools . DockerPuller
2014-07-15 20:24:41 +00:00
// Optional, defaults to /logs/ from /var/log
2014-07-22 21:40:59 +00:00
logServer http . Handler
2014-08-07 18:15:11 +00:00
// Optional, defaults to simple Docker implementation
2014-09-09 04:33:17 +00:00
runner dockertools . ContainerCommandRunner
2014-09-03 20:39:56 +00:00
// Optional, client for http requests, defaults to empty client
2014-10-20 03:15:23 +00:00
httpClient httpGetter
2014-09-26 04:24:44 +00:00
// Optional, maximum pull QPS from the docker registry, 0.0 means unlimited.
pullQPS float32
// Optional, maximum burst QPS from the docker registry, must be positive if QPS is > 0.0
pullBurst int
2014-10-09 00:05:04 +00:00
2015-03-06 07:56:30 +00:00
// cAdvisor used for container information.
cadvisor cadvisor . Interface
2014-10-28 00:29:55 +00:00
// Optional, minimum age required for garbage collection. If zero, no limit.
minimumGCAge time . Duration
maxContainerCount int
2014-11-12 05:21:40 +00:00
// If non-empty, use this for container DNS search.
clusterDomain string
// If non-nil, use this for container DNS server.
clusterDNS net . IP
2015-01-08 15:25:14 +00:00
masterServiceNamespace string
serviceLister serviceLister
2014-11-23 15:47:25 +00:00
// Volume plugins.
volumePluginMgr volume . PluginMgr
2015-02-02 18:51:52 +00:00
2015-02-08 20:19:34 +00:00
// probe runner holder
prober probeHolder
// container readiness state holder
2015-02-02 18:51:52 +00:00
readiness * readinessStates
2015-01-08 20:41:38 +00:00
// how long to keep idle streaming command execution/port forwarding
// connections open before terminating them
streamingConnectionIdleTimeout time . Duration
2015-03-03 06:06:20 +00:00
// the EventRecorder to use
recorder record . EventRecorder
2015-03-03 18:33:25 +00:00
// A pod status cache currently used to store rejected pods and their statuses.
podStatusesLock sync . RWMutex
podStatuses map [ string ] api . PodStatus
2014-10-28 00:29:55 +00:00
}
2014-11-23 15:47:25 +00:00
// getRootDir returns the full path to the directory under which kubelet can
2014-11-29 19:02:28 +00:00
// store data. These functions are useful to pass interfaces to other modules
// that may need to know where to write data without getting a whole kubelet
// instance.
2014-11-23 15:47:25 +00:00
func ( kl * Kubelet ) getRootDir ( ) string {
2014-11-29 19:02:28 +00:00
return kl . rootDirectory
}
2014-11-23 15:47:25 +00:00
// getPodsDir returns the full path to the directory under which pod
2014-11-29 19:02:28 +00:00
// directories are created.
2014-11-23 15:47:25 +00:00
func ( kl * Kubelet ) getPodsDir ( ) string {
return path . Join ( kl . getRootDir ( ) , "pods" )
}
// getPluginsDir returns the full path to the directory under which plugin
// directories are created. Plugins can use these directories for data that
// they need to persist. Plugins should create subdirectories under this named
// after their own names.
func ( kl * Kubelet ) getPluginsDir ( ) string {
return path . Join ( kl . getRootDir ( ) , "plugins" )
}
// getPluginDir returns a data directory name for a given plugin name.
// Plugins can use these directories to store data that they need to persist.
// For per-pod plugin data, see getPodPluginDir.
func ( kl * Kubelet ) getPluginDir ( pluginName string ) string {
return path . Join ( kl . getPluginsDir ( ) , pluginName )
2014-11-29 19:02:28 +00:00
}
2014-11-23 15:47:25 +00:00
// getPodDir returns the full path to the per-pod data directory for the
2014-11-29 19:02:28 +00:00
// specified pod. This directory may not exist if the pod does not exist.
2014-11-23 15:47:25 +00:00
func ( kl * Kubelet ) getPodDir ( podUID types . UID ) string {
2014-11-29 19:02:28 +00:00
// Backwards compat. The "old" stuff should be removed before 1.0
// release. The thinking here is this:
// !old && !new = use new
// !old && new = use new
// old && !new = use old
// old && new = use new (but warn)
2014-11-23 15:47:25 +00:00
oldPath := path . Join ( kl . getRootDir ( ) , string ( podUID ) )
2014-11-29 19:02:28 +00:00
oldExists := dirExists ( oldPath )
2014-11-23 15:47:25 +00:00
newPath := path . Join ( kl . getPodsDir ( ) , string ( podUID ) )
2014-11-29 19:02:28 +00:00
newExists := dirExists ( newPath )
if oldExists && ! newExists {
return oldPath
}
if oldExists {
glog . Warningf ( "Data dir for pod %q exists in both old and new form, using new" , podUID )
}
return newPath
2014-11-29 19:02:28 +00:00
}
2014-11-23 15:47:25 +00:00
// getPodVolumesDir returns the full path to the per-pod data directory under
2014-11-29 19:02:28 +00:00
// which volumes are created for the specified pod. This directory may not
// exist if the pod does not exist.
2014-11-23 15:47:25 +00:00
func ( kl * Kubelet ) getPodVolumesDir ( podUID types . UID ) string {
return path . Join ( kl . getPodDir ( podUID ) , "volumes" )
2014-11-29 19:02:28 +00:00
}
2014-11-23 15:47:25 +00:00
// getPodVolumeDir returns the full path to the directory which represents the
// named volume under the named plugin for specified pod. This directory may not
// exist if the pod does not exist.
func ( kl * Kubelet ) getPodVolumeDir ( podUID types . UID , pluginName string , volumeName string ) string {
return path . Join ( kl . getPodVolumesDir ( podUID ) , pluginName , volumeName )
}
// getPodPluginsDir returns the full path to the per-pod data directory under
// which plugins may store data for the specified pod. This directory may not
// exist if the pod does not exist.
func ( kl * Kubelet ) getPodPluginsDir ( podUID types . UID ) string {
return path . Join ( kl . getPodDir ( podUID ) , "plugins" )
}
// getPodPluginDir returns a data directory name for a given plugin name for a
// given pod UID. Plugins can use these directories to store data that they
// need to persist. For non-per-pod plugin data, see getPluginDir.
func ( kl * Kubelet ) getPodPluginDir ( podUID types . UID , pluginName string ) string {
return path . Join ( kl . getPodPluginsDir ( podUID ) , pluginName )
}
// getPodContainerDir returns the full path to the per-pod data directory under
2014-11-29 19:02:28 +00:00
// which container data is held for the specified pod. This directory may not
// exist if the pod or container does not exist.
2014-11-23 15:47:25 +00:00
func ( kl * Kubelet ) getPodContainerDir ( podUID types . UID , ctrName string ) string {
2014-11-29 19:02:28 +00:00
// Backwards compat. The "old" stuff should be removed before 1.0
// release. The thinking here is this:
// !old && !new = use new
// !old && new = use new
// old && !new = use old
// old && new = use new (but warn)
2014-11-23 15:47:25 +00:00
oldPath := path . Join ( kl . getPodDir ( podUID ) , ctrName )
2014-11-29 19:02:28 +00:00
oldExists := dirExists ( oldPath )
2014-11-23 15:47:25 +00:00
newPath := path . Join ( kl . getPodDir ( podUID ) , "containers" , ctrName )
2014-11-29 19:02:28 +00:00
newExists := dirExists ( newPath )
if oldExists && ! newExists {
return oldPath
}
if oldExists {
glog . Warningf ( "Data dir for pod %q, container %q exists in both old and new form, using new" , podUID , ctrName )
}
return newPath
}
func dirExists ( path string ) bool {
s , err := os . Stat ( path )
if err != nil {
return false
}
return s . IsDir ( )
2014-11-29 19:02:28 +00:00
}
2015-01-12 00:42:11 +00:00
func ( kl * Kubelet ) setupDataDirs ( ) error {
kl . rootDirectory = path . Clean ( kl . rootDirectory )
2014-11-23 15:47:25 +00:00
if err := os . MkdirAll ( kl . getRootDir ( ) , 0750 ) ; err != nil {
2015-01-12 00:42:11 +00:00
return fmt . Errorf ( "error creating root directory: %v" , err )
}
2014-11-23 15:47:25 +00:00
if err := os . MkdirAll ( kl . getPodsDir ( ) , 0750 ) ; err != nil {
2015-01-12 00:42:11 +00:00
return fmt . Errorf ( "error creating pods directory: %v" , err )
}
2014-11-23 15:47:25 +00:00
if err := os . MkdirAll ( kl . getPluginsDir ( ) , 0750 ) ; err != nil {
return fmt . Errorf ( "error creating plugins directory: %v" , err )
}
2015-01-12 00:42:11 +00:00
return nil
}
// Get a list of pods that have data directories.
2015-01-14 23:22:21 +00:00
func ( kl * Kubelet ) listPodsFromDisk ( ) ( [ ] types . UID , error ) {
2014-11-23 15:47:25 +00:00
podInfos , err := ioutil . ReadDir ( kl . getPodsDir ( ) )
2015-01-12 00:42:11 +00:00
if err != nil {
return nil , err
}
2015-01-14 23:22:21 +00:00
pods := [ ] types . UID { }
2015-01-12 00:42:11 +00:00
for i := range podInfos {
if podInfos [ i ] . IsDir ( ) {
2015-01-14 23:22:21 +00:00
pods = append ( pods , types . UID ( podInfos [ i ] . Name ( ) ) )
2015-01-12 00:42:11 +00:00
}
}
return pods , nil
}
2014-10-28 00:29:55 +00:00
type ByCreated [ ] * docker . Container
func ( a ByCreated ) Len ( ) int { return len ( a ) }
func ( a ByCreated ) Swap ( i , j int ) { a [ i ] , a [ j ] = a [ j ] , a [ i ] }
func ( a ByCreated ) Less ( i , j int ) bool { return a [ i ] . Created . After ( a [ j ] . Created ) }
// TODO: these removals are racy, we should make dockerclient threadsafe across List/Inspect transactions.
func ( kl * Kubelet ) purgeOldest ( ids [ ] string ) error {
dockerData := [ ] * docker . Container { }
for _ , id := range ids {
data , err := kl . dockerClient . InspectContainer ( id )
if err != nil {
return err
}
2015-01-07 02:31:40 +00:00
if ! data . State . Running && ( time . Now ( ) . Sub ( data . State . FinishedAt ) > kl . minimumGCAge ) {
2014-10-28 00:29:55 +00:00
dockerData = append ( dockerData , data )
}
}
sort . Sort ( ByCreated ( dockerData ) )
if len ( dockerData ) <= kl . maxContainerCount {
return nil
}
dockerData = dockerData [ kl . maxContainerCount : ]
for _ , data := range dockerData {
if err := kl . dockerClient . RemoveContainer ( docker . RemoveContainerOptions { ID : data . ID } ) ; err != nil {
return err
}
}
return nil
}
2014-12-22 19:54:07 +00:00
func ( kl * Kubelet ) GarbageCollectLoop ( ) {
util . Forever ( func ( ) {
if err := kl . GarbageCollectContainers ( ) ; err != nil {
glog . Errorf ( "Garbage collect failed: %v" , err )
}
} , time . Minute * 1 )
}
2014-10-28 00:29:55 +00:00
// TODO: Also enforce a maximum total number of containers.
func ( kl * Kubelet ) GarbageCollectContainers ( ) error {
if kl . maxContainerCount == 0 {
return nil
}
containers , err := dockertools . GetKubeletDockerContainers ( kl . dockerClient , true )
if err != nil {
return err
}
2015-03-12 18:07:45 +00:00
type unidentifiedContainer struct {
// Docker ID.
id string
// Docker container name
name string
}
unidentifiedContainers := make ( [ ] unidentifiedContainer , 0 )
2015-01-14 21:53:43 +00:00
uidToIDMap := map [ string ] [ ] string { }
2014-10-28 00:29:55 +00:00
for _ , container := range containers {
2015-03-12 23:31:57 +00:00
_ , uid , name , _ , err := dockertools . ParseDockerName ( container . Names [ 0 ] )
if err != nil {
2015-03-12 18:07:45 +00:00
unidentifiedContainers = append ( unidentifiedContainers , unidentifiedContainer {
id : container . ID ,
name : container . Names [ 0 ] ,
} )
continue
}
2015-01-14 21:53:43 +00:00
uidName := string ( uid ) + "." + name
uidToIDMap [ uidName ] = append ( uidToIDMap [ uidName ] , container . ID )
2014-10-28 00:29:55 +00:00
}
2015-03-12 18:07:45 +00:00
// Remove all non-running unidentified containers.
for _ , container := range unidentifiedContainers {
data , err := kl . dockerClient . InspectContainer ( container . id )
if err != nil {
return err
}
if data . State . Running {
continue
}
glog . Infof ( "Removing unidentified dead container %q with ID %q" , container . name , container . id )
err = kl . dockerClient . RemoveContainer ( docker . RemoveContainerOptions { ID : container . id } )
if err != nil {
return err
}
}
// Evict dead containers according to our policies.
2015-01-14 21:53:43 +00:00
for _ , list := range uidToIDMap {
2014-10-28 00:29:55 +00:00
if len ( list ) <= kl . maxContainerCount {
continue
}
if err := kl . purgeOldest ( list ) ; err != nil {
return err
}
}
2015-03-12 18:07:45 +00:00
2014-10-28 00:29:55 +00:00
return nil
2014-10-09 00:05:04 +00:00
}
2015-03-03 18:33:25 +00:00
func ( kl * Kubelet ) getPodStatusFromCache ( podFullName string ) ( api . PodStatus , bool ) {
kl . podStatusesLock . RLock ( )
defer kl . podStatusesLock . RUnlock ( )
status , ok := kl . podStatuses [ podFullName ]
return status , ok
}
func ( kl * Kubelet ) setPodStatusInCache ( podFullName string , status api . PodStatus ) {
kl . podStatusesLock . Lock ( )
defer kl . podStatusesLock . Unlock ( )
kl . podStatuses [ podFullName ] = status
}
func ( kl * Kubelet ) removeOrphanedStatuses ( podFullNames map [ string ] bool ) {
kl . podStatusesLock . Lock ( )
defer kl . podStatusesLock . Unlock ( )
for key := range kl . podStatuses {
if _ , ok := podFullNames [ key ] ; ! ok {
glog . V ( 5 ) . Infof ( "Removing %q from status map." , key )
delete ( kl . podStatuses , key )
}
}
}
2014-07-15 20:24:41 +00:00
// Run starts the kubelet reacting to config updates
func ( kl * Kubelet ) Run ( updates <- chan PodUpdate ) {
2014-07-22 21:40:59 +00:00
if kl . logServer == nil {
kl . logServer = http . StripPrefix ( "/logs/" , http . FileServer ( http . Dir ( "/var/log/" ) ) )
2014-07-15 07:04:30 +00:00
}
2014-07-22 21:40:59 +00:00
if kl . dockerPuller == nil {
2014-09-26 04:24:44 +00:00
kl . dockerPuller = dockertools . NewDockerPuller ( kl . dockerClient , kl . pullQPS , kl . pullBurst )
2014-06-24 23:31:33 +00:00
}
2015-02-23 21:04:45 +00:00
if kl . kubeClient == nil {
glog . Warning ( "No api server defined - no node status update will be sent." )
}
go kl . syncNodeStatus ( )
2014-07-15 20:24:41 +00:00
kl . syncLoop ( updates , kl )
2014-06-06 23:40:48 +00:00
}
2015-02-23 21:04:45 +00:00
// syncNodeStatus periodically synchronizes node status to master.
func ( kl * Kubelet ) syncNodeStatus ( ) {
if kl . kubeClient == nil {
return
}
for feq := initialNodeStatusUpdateFrequency ; feq < kl . statusUpdateFrequency ; feq += nodeStatusUpdateFrequencyInc {
select {
case <- time . After ( feq ) :
if err := kl . updateNodeStatus ( ) ; err != nil {
glog . Errorf ( "Unable to update node status: %v" , err )
}
}
}
for {
select {
case <- time . After ( kl . statusUpdateFrequency ) :
if err := kl . updateNodeStatus ( ) ; err != nil {
glog . Errorf ( "Unable to update node status: %v" , err )
}
}
}
}
2014-10-08 19:56:02 +00:00
func makeBinds ( pod * api . BoundPod , container * api . Container , podVolumes volumeMap ) [ ] string {
2014-06-06 23:40:48 +00:00
binds := [ ] string { }
2014-08-27 05:08:06 +00:00
for _ , mount := range container . VolumeMounts {
vol , ok := podVolumes [ mount . Name ]
if ! ok {
continue
2014-06-19 23:59:48 +00:00
}
2014-08-27 05:08:06 +00:00
b := fmt . Sprintf ( "%s:%s" , vol . GetPath ( ) , mount . MountPath )
if mount . ReadOnly {
b += ":ro"
2014-06-06 23:40:48 +00:00
}
2014-08-27 05:08:06 +00:00
binds = append ( binds , b )
2014-06-06 23:40:48 +00:00
}
2014-08-27 05:08:06 +00:00
return binds
2014-06-09 20:47:25 +00:00
}
2015-02-23 21:04:45 +00:00
2014-06-09 20:47:25 +00:00
func makePortsAndBindings ( container * api . Container ) ( map [ docker . Port ] struct { } , map [ docker . Port ] [ ] docker . PortBinding ) {
2014-06-06 23:40:48 +00:00
exposedPorts := map [ docker . Port ] struct { } { }
portBindings := map [ docker . Port ] [ ] docker . PortBinding { }
for _ , port := range container . Ports {
exteriorPort := port . HostPort
2014-08-19 22:18:49 +00:00
if exteriorPort == 0 {
// No need to do port binding when HostPort is not specified
continue
}
interiorPort := port . ContainerPort
2014-06-06 23:40:48 +00:00
// Some of this port stuff is under-documented voodoo.
// See http://stackoverflow.com/questions/20428302/binding-a-port-to-a-host-interface-using-the-rest-api
2014-06-16 04:19:35 +00:00
var protocol string
2014-09-28 03:31:37 +00:00
switch strings . ToUpper ( string ( port . Protocol ) ) {
2014-07-08 04:32:56 +00:00
case "UDP" :
2014-06-16 04:19:35 +00:00
protocol = "/udp"
2014-07-08 04:32:56 +00:00
case "TCP" :
2014-06-16 04:19:35 +00:00
protocol = "/tcp"
default :
2015-01-06 00:38:47 +00:00
glog . Warningf ( "Unknown protocol %q: defaulting to TCP" , port . Protocol )
2014-06-16 04:19:35 +00:00
protocol = "/tcp"
}
dockerPort := docker . Port ( strconv . Itoa ( interiorPort ) + protocol )
2014-06-06 23:40:48 +00:00
exposedPorts [ dockerPort ] = struct { } { }
portBindings [ dockerPort ] = [ ] docker . PortBinding {
2014-06-12 21:09:40 +00:00
{
2014-06-06 23:40:48 +00:00
HostPort : strconv . Itoa ( exteriorPort ) ,
2014-12-18 19:15:35 +00:00
HostIP : port . HostIP ,
2014-06-06 23:40:48 +00:00
} ,
}
}
2014-06-09 20:47:25 +00:00
return exposedPorts , portBindings
}
2015-01-05 21:16:18 +00:00
func milliCPUToShares ( milliCPU int64 ) int64 {
2014-07-29 18:34:16 +00:00
if milliCPU == 0 {
// zero milliCPU means unset. Use kernel default.
return 0
}
2014-07-16 09:46:22 +00:00
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
shares := ( milliCPU * sharesPerCPU ) / milliCPUToCPU
2014-07-15 23:49:34 +00:00
if shares < minShares {
return minShares
2014-06-19 12:29:42 +00:00
}
2014-07-15 23:49:34 +00:00
return shares
2014-06-19 12:29:42 +00:00
}
2014-12-15 13:08:08 +00:00
func makeCapabilites ( capAdd [ ] api . CapabilityType , capDrop [ ] api . CapabilityType ) ( [ ] string , [ ] string ) {
var (
addCaps [ ] string
dropCaps [ ] string
)
for _ , cap := range capAdd {
addCaps = append ( addCaps , string ( cap ) )
}
for _ , cap := range capDrop {
dropCaps = append ( dropCaps , string ( cap ) )
}
return addCaps , dropCaps
}
2014-09-03 20:39:56 +00:00
// A basic interface that knows how to execute handlers
type actionHandler interface {
2015-01-14 23:22:21 +00:00
Run ( podFullName string , uid types . UID , container * api . Container , handler * api . Handler ) error
2014-09-03 20:39:56 +00:00
}
func ( kl * Kubelet ) newActionHandler ( handler * api . Handler ) actionHandler {
switch {
case handler . Exec != nil :
return & execActionHandler { kubelet : kl }
case handler . HTTPGet != nil :
return & httpActionHandler { client : kl . httpClient , kubelet : kl }
default :
2014-10-07 20:53:25 +00:00
glog . Errorf ( "Invalid handler: %v" , handler )
2014-09-03 20:39:56 +00:00
return nil
}
}
2015-01-14 23:22:21 +00:00
func ( kl * Kubelet ) runHandler ( podFullName string , uid types . UID , container * api . Container , handler * api . Handler ) error {
2014-09-03 20:39:56 +00:00
actionHandler := kl . newActionHandler ( handler )
if actionHandler == nil {
return fmt . Errorf ( "invalid handler" )
}
2015-01-14 21:53:43 +00:00
return actionHandler . Run ( podFullName , uid , container , handler )
2014-09-03 20:39:56 +00:00
}
2014-11-04 00:16:31 +00:00
// fieldPath returns a fieldPath locating container within pod.
// Returns an error if the container isn't part of the pod.
func fieldPath ( pod * api . BoundPod , container * api . Container ) ( string , error ) {
for i := range pod . Spec . Containers {
here := & pod . Spec . Containers [ i ]
2014-12-10 01:06:39 +00:00
if here . Name == container . Name {
2014-12-30 01:10:38 +00:00
if here . Name == "" {
return fmt . Sprintf ( "spec.containers[%d]" , i ) , nil
} else {
return fmt . Sprintf ( "spec.containers{%s}" , here . Name ) , nil
}
2014-11-04 00:16:31 +00:00
}
}
return "" , fmt . Errorf ( "container %#v not found in pod %#v" , container , pod )
}
// containerRef returns an *api.ObjectReference which references the given container within the
// given pod. Returns an error if the reference can't be constructed or the container doesn't
// actually belong to the pod.
2014-11-15 01:16:05 +00:00
// TODO: Pods that came to us by static config or over HTTP have no selfLink set, which makes
// this fail and log an error. Figure out how we want to identify these pods to the rest of the
// system.
2014-11-04 00:16:31 +00:00
func containerRef ( pod * api . BoundPod , container * api . Container ) ( * api . ObjectReference , error ) {
fieldPath , err := fieldPath ( pod , container )
if err != nil {
// TODO: figure out intelligent way to refer to containers that we implicitly
2015-01-21 00:59:26 +00:00
// start (like the pod infra container). This is not a good way, ugh.
2014-11-04 00:16:31 +00:00
fieldPath = "implicitly required container " + container . Name
}
ref , err := api . GetPartialReference ( pod , fieldPath )
if err != nil {
return nil , err
}
return ref , nil
}
// setRef stores a reference to a pod's container, associating it with the given docker id.
func ( kl * Kubelet ) setRef ( id dockertools . DockerID , ref * api . ObjectReference ) {
kl . refLock . Lock ( )
defer kl . refLock . Unlock ( )
2014-12-10 01:53:29 +00:00
if kl . dockerIDToRef == nil {
kl . dockerIDToRef = map [ dockertools . DockerID ] * api . ObjectReference { }
}
2014-11-04 00:16:31 +00:00
kl . dockerIDToRef [ id ] = ref
}
// clearRef forgets the given docker id and its associated container reference.
func ( kl * Kubelet ) clearRef ( id dockertools . DockerID ) {
kl . refLock . Lock ( )
defer kl . refLock . Unlock ( )
delete ( kl . dockerIDToRef , id )
}
// getRef returns the container reference of the given id, or (nil, false) if none is stored.
func ( kl * Kubelet ) getRef ( id dockertools . DockerID ) ( ref * api . ObjectReference , ok bool ) {
kl . refLock . RLock ( )
defer kl . refLock . RUnlock ( )
ref , ok = kl . dockerIDToRef [ id ]
return ref , ok
}
2014-07-15 20:24:41 +00:00
// Run a single container from a pod. Returns the docker container ID
2015-01-21 00:59:26 +00:00
func ( kl * Kubelet ) runContainer ( pod * api . BoundPod , container * api . Container , podVolumes volumeMap , netMode , ipcMode string ) ( id dockertools . DockerID , err error ) {
2014-11-14 19:34:41 +00:00
ref , err := containerRef ( pod , container )
if err != nil {
glog . Errorf ( "Couldn't make a ref to pod %v, container %v: '%v'" , pod . Name , container . Name , err )
}
2015-01-08 15:25:14 +00:00
envVariables , err := kl . makeEnvironmentVariables ( pod . Namespace , container )
if err != nil {
return "" , err
}
2014-08-27 05:08:06 +00:00
binds := makeBinds ( pod , container , podVolumes )
2014-06-09 20:47:25 +00:00
exposedPorts , portBindings := makePortsAndBindings ( container )
2014-06-06 23:40:48 +00:00
opts := docker . CreateContainerOptions {
2014-10-08 19:56:02 +00:00
Name : dockertools . BuildDockerName ( pod . UID , GetPodFullName ( pod ) , container ) ,
2014-06-06 23:40:48 +00:00
Config : & docker . Config {
2014-07-15 03:56:18 +00:00
Cmd : container . Command ,
Env : envVariables ,
ExposedPorts : exposedPorts ,
2014-10-22 17:02:02 +00:00
Hostname : pod . Name ,
2014-06-06 23:40:48 +00:00
Image : container . Image ,
2015-01-25 04:19:36 +00:00
Memory : container . Resources . Limits . Memory ( ) . Value ( ) ,
CPUShares : milliCPUToShares ( container . Resources . Limits . Cpu ( ) . MilliValue ( ) ) ,
2014-06-06 23:40:48 +00:00
WorkingDir : container . WorkingDir ,
} ,
}
2014-07-22 21:40:59 +00:00
dockerContainer , err := kl . dockerClient . CreateContainer ( opts )
2014-06-06 23:40:48 +00:00
if err != nil {
2014-11-14 19:34:41 +00:00
if ref != nil {
2015-03-03 06:06:20 +00:00
kl . recorder . Eventf ( ref , "failed" , "Failed to create docker container with error: %v" , err )
2014-11-14 19:34:41 +00:00
}
2014-06-06 23:40:48 +00:00
return "" , err
}
2014-11-14 19:34:41 +00:00
// Remember this reference so we can report events about this container
if ref != nil {
kl . setRef ( dockertools . DockerID ( dockerContainer . ID ) , ref )
2015-03-03 06:06:20 +00:00
kl . recorder . Eventf ( ref , "created" , "Created with docker id %v" , dockerContainer . ID )
2014-11-14 19:34:41 +00:00
}
2014-11-07 06:41:16 +00:00
if len ( container . TerminationMessagePath ) != 0 {
2014-11-23 15:47:25 +00:00
p := kl . getPodContainerDir ( pod . UID , container . Name )
2014-11-07 06:41:16 +00:00
if err := os . MkdirAll ( p , 0750 ) ; err != nil {
2015-01-06 00:38:47 +00:00
glog . Errorf ( "Error on creating %q: %v" , p , err )
2014-11-07 06:41:16 +00:00
} else {
containerLogPath := path . Join ( p , dockerContainer . ID )
fs , err := os . Create ( containerLogPath )
if err != nil {
2015-02-16 17:32:37 +00:00
// TODO: Clean up the previouly created dir? return the error?
2015-01-06 00:38:47 +00:00
glog . Errorf ( "Error on creating termination-log file %q: %v" , containerLogPath , err )
2015-02-16 17:32:37 +00:00
} else {
2015-02-20 01:11:25 +00:00
fs . Close ( ) // Close immediately; we're just doing a `touch` here
2015-02-16 17:32:37 +00:00
b := fmt . Sprintf ( "%s:%s" , containerLogPath , container . TerminationMessagePath )
binds = append ( binds , b )
2014-11-07 06:41:16 +00:00
}
}
}
2014-09-11 23:34:24 +00:00
privileged := false
2014-09-16 22:18:33 +00:00
if capabilities . Get ( ) . AllowPrivileged {
2014-09-11 23:34:24 +00:00
privileged = container . Privileged
} else if container . Privileged {
2014-11-20 10:00:36 +00:00
return "" , fmt . Errorf ( "container requested privileged mode, but it is disallowed globally." )
2014-09-11 23:34:24 +00:00
}
2014-12-15 13:08:08 +00:00
capAdd , capDrop := makeCapabilites ( container . Capabilities . Add , container . Capabilities . Drop )
2014-11-12 05:21:40 +00:00
hc := & docker . HostConfig {
2014-06-06 23:40:48 +00:00
PortBindings : portBindings ,
Binds : binds ,
2014-06-20 03:30:42 +00:00
NetworkMode : netMode ,
2015-01-21 00:59:26 +00:00
IpcMode : ipcMode ,
2014-09-11 23:34:24 +00:00
Privileged : privileged ,
2014-12-15 13:08:08 +00:00
CapAdd : capAdd ,
CapDrop : capDrop ,
2014-11-12 05:21:40 +00:00
}
if pod . Spec . DNSPolicy == api . DNSClusterFirst {
if err := kl . applyClusterDNS ( hc , pod ) ; err != nil {
return "" , err
}
}
err = kl . dockerClient . StartContainer ( dockerContainer . ID , hc )
2014-11-04 00:16:31 +00:00
if err != nil {
2014-11-14 19:34:41 +00:00
if ref != nil {
2015-03-03 06:06:20 +00:00
kl . recorder . Eventf ( ref , "failed" ,
2014-11-19 21:57:54 +00:00
"Failed to start with docker id %v with error: %v" , dockerContainer . ID , err )
2014-11-14 19:34:41 +00:00
}
2014-11-04 00:16:31 +00:00
return "" , err
}
2014-11-14 19:34:41 +00:00
if ref != nil {
2015-03-03 06:06:20 +00:00
kl . recorder . Eventf ( ref , "started" , "Started with docker id %v" , dockerContainer . ID )
2014-11-04 00:16:31 +00:00
}
if container . Lifecycle != nil && container . Lifecycle . PostStart != nil {
2014-10-08 19:56:02 +00:00
handlerErr := kl . runHandler ( GetPodFullName ( pod ) , pod . UID , container , container . Lifecycle . PostStart )
2014-09-03 20:39:56 +00:00
if handlerErr != nil {
2015-03-04 01:33:48 +00:00
kl . killContainerByID ( dockerContainer . ID )
2014-09-09 04:33:17 +00:00
return dockertools . DockerID ( "" ) , fmt . Errorf ( "failed to call event handler: %v" , handlerErr )
2014-09-03 20:39:56 +00:00
}
}
2014-09-09 04:33:17 +00:00
return dockertools . DockerID ( dockerContainer . ID ) , err
2014-06-06 23:40:48 +00:00
}
2015-01-08 15:25:14 +00:00
var masterServices = util . NewStringSet ( "kubernetes" , "kubernetes-ro" )
// getServiceEnvVarMap makes a map[string]string of env vars for services a pod in namespace ns should see
func ( kl * Kubelet ) getServiceEnvVarMap ( ns string ) ( map [ string ] string , error ) {
var (
serviceMap = make ( map [ string ] api . Service )
m = make ( map [ string ] string )
)
// Get all service resources from the master (via a cache),
// and populate them into service enviroment variables.
if kl . serviceLister == nil {
// Kubelets without masters (e.g. plain GCE ContainerVM) don't set env vars.
return m , nil
}
services , err := kl . serviceLister . List ( )
if err != nil {
2015-02-16 17:33:20 +00:00
return m , fmt . Errorf ( "failed to list services when setting up env vars." )
2015-01-08 15:25:14 +00:00
}
// project the services in namespace ns onto the master services
for _ , service := range services . Items {
serviceName := service . Name
switch service . Namespace {
// for the case whether the master service namespace is the namespace the pod
2015-01-28 17:00:53 +00:00
// is in, the pod should receive all the services in the namespace.
2015-01-08 15:25:14 +00:00
//
// ordering of the case clauses below enforces this
case ns :
serviceMap [ serviceName ] = service
case kl . masterServiceNamespace :
if masterServices . Has ( serviceName ) {
_ , exists := serviceMap [ serviceName ]
if ! exists {
serviceMap [ serviceName ] = service
}
}
}
}
services . Items = [ ] api . Service { }
for _ , service := range serviceMap {
services . Items = append ( services . Items , service )
}
for _ , e := range envvars . FromServices ( & services ) {
m [ e . Name ] = e . Value
}
return m , nil
}
// Make the service environment variables for a pod in the given namespace.
func ( kl * Kubelet ) makeEnvironmentVariables ( ns string , container * api . Container ) ( [ ] string , error ) {
var result [ ] string
// Note: These are added to the docker.Config, but are not included in the checksum computed
// by dockertools.BuildDockerName(...). That way, we can still determine whether an
// api.Container is already running by its hash. (We don't want to restart a container just
// because some service changed.)
//
// Note that there is a race between Kubelet seeing the pod and kubelet seeing the service.
// To avoid this users can: (1) wait between starting a service and starting; or (2) detect
// missing service env var and exit and be restarted; or (3) use DNS instead of env vars
// and keep trying to resolve the DNS name of the service (recommended).
serviceEnv , err := kl . getServiceEnvVarMap ( ns )
if err != nil {
return result , err
}
for _ , value := range container . Env {
2015-03-12 14:39:22 +00:00
// Accesses apiserver+Pods.
2015-01-08 15:25:14 +00:00
// So, the master may set service env vars, or kubelet may. In case both are doing
// it, we delete the key from the kubelet-generated ones so we don't have duplicate
// env vars.
// TODO: remove this net line once all platforms use apiserver+Pods.
delete ( serviceEnv , value . Name )
result = append ( result , fmt . Sprintf ( "%s=%s" , value . Name , value . Value ) )
}
// Append remaining service env vars.
for k , v := range serviceEnv {
result = append ( result , fmt . Sprintf ( "%s=%s" , k , v ) )
}
return result , nil
}
2014-11-12 05:21:40 +00:00
func ( kl * Kubelet ) applyClusterDNS ( hc * docker . HostConfig , pod * api . BoundPod ) error {
// Get host DNS settings and append them to cluster DNS settings.
f , err := os . Open ( "/etc/resolv.conf" )
if err != nil {
return err
}
defer f . Close ( )
hostDNS , hostSearch , err := parseResolvConf ( f )
if err != nil {
return err
}
if kl . clusterDNS != nil {
hc . DNS = append ( [ ] string { kl . clusterDNS . String ( ) } , hostDNS ... )
}
if kl . clusterDomain != "" {
nsDomain := fmt . Sprintf ( "%s.%s" , pod . Namespace , kl . clusterDomain )
hc . DNSSearch = append ( [ ] string { nsDomain , kl . clusterDomain } , hostSearch ... )
}
return nil
}
// Returns the list of DNS servers and DNS search domains.
func parseResolvConf ( reader io . Reader ) ( nameservers [ ] string , searches [ ] string , err error ) {
file , err := ioutil . ReadAll ( reader )
if err != nil {
return nil , nil , err
}
// Lines of the form "nameserver 1.2.3.4" accumulate.
nameservers = [ ] string { }
// Lines of the form "search example.com" overrule - last one wins.
searches = [ ] string { }
lines := strings . Split ( string ( file ) , "\n" )
for l := range lines {
trimmed := strings . TrimSpace ( lines [ l ] )
if strings . HasPrefix ( trimmed , "#" ) {
continue
}
fields := strings . Fields ( trimmed )
if len ( fields ) == 0 {
continue
}
if fields [ 0 ] == "nameserver" {
nameservers = append ( nameservers , fields [ 1 : ] ... )
}
if fields [ 0 ] == "search" {
searches = fields [ 1 : ]
}
}
return nameservers , searches , nil
}
2014-06-25 23:24:20 +00:00
// Kill a docker container
2014-08-08 04:49:17 +00:00
func ( kl * Kubelet ) killContainer ( dockerContainer * docker . APIContainers ) error {
2015-03-04 01:33:48 +00:00
return kl . killContainerByID ( dockerContainer . ID )
2014-09-03 20:39:56 +00:00
}
2015-03-04 01:33:48 +00:00
func ( kl * Kubelet ) killContainerByID ( ID string ) error {
glog . V ( 2 ) . Infof ( "Killing container with id %q" , ID )
2015-02-02 18:51:52 +00:00
kl . readiness . remove ( ID )
2014-09-03 20:39:56 +00:00
err := kl . dockerClient . StopContainer ( ID , 10 )
2014-09-24 21:35:34 +00:00
2014-11-04 00:16:31 +00:00
ref , ok := kl . getRef ( dockertools . DockerID ( ID ) )
if ! ok {
2015-03-04 01:33:48 +00:00
glog . Warningf ( "No ref for pod '%v'" , ID )
2014-11-04 00:16:31 +00:00
} else {
// TODO: pass reason down here, and state, or move this call up the stack.
2015-03-04 01:33:48 +00:00
kl . recorder . Eventf ( ref , "killing" , "Killing %v" , ID )
2014-11-04 00:16:31 +00:00
}
2014-06-06 23:40:48 +00:00
return err
}
2014-07-19 07:09:43 +00:00
const (
2015-01-21 00:59:26 +00:00
PodInfraContainerImage = "kubernetes/pause:latest"
2014-07-19 07:09:43 +00:00
)
2014-06-20 03:30:42 +00:00
2015-01-21 00:59:26 +00:00
// createPodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container.
func ( kl * Kubelet ) createPodInfraContainer ( pod * api . BoundPod ) ( dockertools . DockerID , error ) {
2015-02-23 22:25:56 +00:00
var ports [ ] api . ContainerPort
2015-01-21 00:59:26 +00:00
// Docker only exports ports from the pod infra container. Let's
2014-06-20 15:55:02 +00:00
// collect all of the relevant ports and export them.
2014-10-08 19:56:02 +00:00
for _ , container := range pod . Spec . Containers {
2014-06-20 03:30:42 +00:00
ports = append ( ports , container . Ports ... )
}
container := & api . Container {
2015-01-21 00:59:26 +00:00
Name : dockertools . PodInfraContainerName ,
Image : kl . podInfraContainerImage ,
2014-07-19 07:09:43 +00:00
Ports : ports ,
2014-06-20 03:30:42 +00:00
}
2014-11-19 21:57:54 +00:00
ref , err := containerRef ( pod , container )
if err != nil {
glog . Errorf ( "Couldn't make a ref to pod %v, container %v: '%v'" , pod . Name , container . Name , err )
}
2014-10-02 18:58:58 +00:00
// TODO: make this a TTL based pull (if image older than X policy, pull)
ok , err := kl . dockerPuller . IsImagePresent ( container . Image )
if err != nil {
2014-11-19 21:57:54 +00:00
if ref != nil {
2015-03-03 06:06:20 +00:00
kl . recorder . Eventf ( ref , "failed" , "Failed to inspect image %q" , container . Image )
2014-11-19 21:57:54 +00:00
}
2014-09-26 04:24:44 +00:00
return "" , err
}
2014-10-02 18:58:58 +00:00
if ! ok {
2014-12-22 19:54:07 +00:00
if err := kl . pullImage ( container . Image , ref ) ; err != nil {
2014-10-02 18:58:58 +00:00
return "" , err
}
}
2014-11-19 21:57:54 +00:00
if ref != nil {
2015-03-03 06:06:20 +00:00
kl . recorder . Eventf ( ref , "pulled" , "Successfully pulled image %q" , container . Image )
2014-11-19 21:57:54 +00:00
}
2015-02-20 03:17:44 +00:00
id , err := kl . runContainer ( pod , container , nil , "" , "" )
if err != nil {
return "" , err
}
// Set OOM score of POD container to lower than those of the other
// containers in the pod. This ensures that it is killed only as a last
// resort.
containerInfo , err := kl . dockerClient . InspectContainer ( string ( id ) )
if err != nil {
return "" , err
}
2015-02-20 21:10:40 +00:00
// Ensure the PID actually exists, else we'll move ourselves.
if containerInfo . State . Pid == 0 {
return "" , fmt . Errorf ( "failed to get init PID for Docker pod infra container %q" , string ( id ) )
}
2015-02-20 03:17:44 +00:00
return id , util . ApplyOomScoreAdj ( containerInfo . State . Pid , podOomScoreAdj )
2014-06-20 03:30:42 +00:00
}
2014-12-22 19:54:07 +00:00
func ( kl * Kubelet ) pullImage ( img string , ref * api . ObjectReference ) error {
2015-02-24 00:22:12 +00:00
start := time . Now ( )
defer func ( ) {
2015-02-24 23:29:18 +00:00
metrics . ImagePullLatency . Observe ( metrics . SinceInMicroseconds ( start ) )
2015-02-24 00:22:12 +00:00
} ( )
2014-12-22 19:54:07 +00:00
if err := kl . dockerPuller . Pull ( img ) ; err != nil {
if ref != nil {
2015-03-03 06:06:20 +00:00
kl . recorder . Eventf ( ref , "failed" , "Failed to pull image %q" , img )
2014-12-22 19:54:07 +00:00
}
return err
}
2015-01-08 01:00:57 +00:00
if ref != nil {
2015-03-03 06:06:20 +00:00
kl . recorder . Eventf ( ref , "pulled" , "Successfully pulled image %q" , img )
2015-01-08 01:00:57 +00:00
}
2014-12-22 19:54:07 +00:00
return nil
}
2014-10-21 05:03:23 +00:00
// Kill all containers in a pod. Returns the number of containers deleted and an error if one occurs.
func ( kl * Kubelet ) killContainersInPod ( pod * api . BoundPod , dockerContainers dockertools . DockerContainers ) ( int , error ) {
podFullName := GetPodFullName ( pod )
2014-08-08 04:49:17 +00:00
count := 0
2014-10-08 19:56:02 +00:00
errs := make ( chan error , len ( pod . Spec . Containers ) )
2014-08-08 04:49:17 +00:00
wg := sync . WaitGroup { }
2014-10-08 19:56:02 +00:00
for _ , container := range pod . Spec . Containers {
2014-10-22 04:37:15 +00:00
// TODO: Consider being more aggressive: kill all containers with this pod UID, period.
2014-10-08 19:56:02 +00:00
if dockerContainer , found , _ := dockerContainers . FindPodContainer ( podFullName , pod . UID , container . Name ) ; found {
2014-08-08 04:49:17 +00:00
count ++
wg . Add ( 1 )
go func ( ) {
2015-02-28 00:32:40 +00:00
defer util . HandleCrash ( )
2014-08-08 04:49:17 +00:00
err := kl . killContainer ( dockerContainer )
if err != nil {
2015-01-06 00:38:47 +00:00
glog . Errorf ( "Failed to delete container: %v; Skipping pod %q" , err , podFullName )
2014-08-08 04:49:17 +00:00
errs <- err
}
wg . Done ( )
} ( )
}
}
wg . Wait ( )
close ( errs )
if len ( errs ) > 0 {
errList := [ ] error { }
for err := range errs {
errList = append ( errList , err )
}
return - 1 , fmt . Errorf ( "failed to delete containers (%v)" , errList )
}
return count , nil
}
2014-07-18 18:42:47 +00:00
type empty struct { }
2015-03-04 01:33:48 +00:00
// makePodDataDirs creates the dirs for the pod datas.
func ( kl * Kubelet ) makePodDataDirs ( pod * api . BoundPod ) error {
uid := pod . UID
if err := os . Mkdir ( kl . getPodDir ( uid ) , 0750 ) ; err != nil && ! os . IsExist ( err ) {
return err
}
if err := os . Mkdir ( kl . getPodVolumesDir ( uid ) , 0750 ) ; err != nil && ! os . IsExist ( err ) {
return err
}
if err := os . Mkdir ( kl . getPodPluginsDir ( uid ) , 0750 ) ; err != nil && ! os . IsExist ( err ) {
return err
}
return nil
}
2015-03-09 23:55:51 +00:00
func ( kl * Kubelet ) shouldContainerBeRestarted ( container * api . Container , pod * api . BoundPod ) bool {
podFullName := GetPodFullName ( pod )
2015-03-04 15:02:49 +00:00
// Check RestartPolicy for dead container
2015-03-09 23:55:51 +00:00
recentContainers , err := dockertools . GetRecentDockerContainersWithNameAndUUID ( kl . dockerClient , podFullName , pod . UID , container . Name )
2015-03-04 15:02:49 +00:00
if err != nil {
2015-03-09 23:55:51 +00:00
glog . Errorf ( "Error listing recent containers for pod %q: %v" , podFullName , err )
2015-03-04 15:02:49 +00:00
// TODO(dawnchen): error handling here?
}
// set dead containers to unready state
for _ , c := range recentContainers {
kl . readiness . remove ( c . ID )
}
if len ( recentContainers ) > 0 {
if pod . Spec . RestartPolicy . Never != nil {
2015-03-09 23:55:51 +00:00
glog . Infof ( "Already ran container %q of pod %q, do nothing" , container . Name , podFullName )
2015-03-04 15:02:49 +00:00
return false
}
if pod . Spec . RestartPolicy . OnFailure != nil {
// Check the exit code of last run
if recentContainers [ 0 ] . State . ExitCode == 0 {
2015-03-09 23:55:51 +00:00
glog . Infof ( "Already successfully ran container %q of pod %q, do nothing" , container . Name , podFullName )
2015-03-04 15:02:49 +00:00
return false
}
}
}
return true
}
// Finds an infra container for a pod given by podFullName and UID in dockerContainers. If there is an infra container
// return its ID and true, otherwise it returns empty ID and false.
func ( kl * Kubelet ) getPodInfraContainer ( podFullName string , uid types . UID ,
dockerContainers dockertools . DockerContainers ) ( dockertools . DockerID , bool ) {
if podInfraDockerContainer , found , _ := dockerContainers . FindPodContainer ( podFullName , uid , dockertools . PodInfraContainerName ) ; found {
podInfraContainerID := dockertools . DockerID ( podInfraDockerContainer . ID )
return podInfraContainerID , true
}
return "" , false
}
// Attempts to start a container pulling the image before that if necessary. It returns DockerID of a started container
// if it was successful, and a non-nil error otherwise.
func ( kl * Kubelet ) pullImageAndRunContainer ( pod * api . BoundPod , container * api . Container , podVolumes * volumeMap ,
podInfraContainerID dockertools . DockerID ) ( dockertools . DockerID , error ) {
podFullName := GetPodFullName ( pod )
ref , err := containerRef ( pod , container )
if err != nil {
glog . Errorf ( "Couldn't make a ref to pod %v, container %v: '%v'" , pod . Name , container . Name , err )
}
if container . ImagePullPolicy != api . PullNever {
present , err := kl . dockerPuller . IsImagePresent ( container . Image )
if err != nil {
if ref != nil {
kl . recorder . Eventf ( ref , "failed" , "Failed to inspect image %q" , container . Image )
}
glog . Errorf ( "Failed to inspect image %q: %v; skipping pod %q container %q" , container . Image , err , podFullName , container . Name )
return "" , err
}
if container . ImagePullPolicy == api . PullAlways ||
( container . ImagePullPolicy == api . PullIfNotPresent && ( ! present ) ) {
if err := kl . pullImage ( container . Image , ref ) ; err != nil {
return "" , err
}
}
}
// TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container
namespaceMode := fmt . Sprintf ( "container:%v" , podInfraContainerID )
containerID , err := kl . runContainer ( pod , container , * podVolumes , namespaceMode , namespaceMode )
if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures?
glog . Errorf ( "Error running pod %q container %q: %v" , podFullName , container . Name , err )
return "" , err
}
return containerID , nil
}
2015-03-10 14:09:55 +00:00
// Structure keeping information on changes that need to happen for a pod. The semantics is as follows:
// - startInfraContainer is true if new Infra Containers have to be started and old one (if running) killed.
// Additionally if it is true then containersToKeep have to be empty
// - infraContainerId have to be set iff startInfraContainer is false. It stores dockerID of running Infra Container
// - containersToStart keeps indices of Specs of containers that have to be started.
// - containersToKeep stores mapping from dockerIDs of running containers to indices of their Specs for containers that
// should be kept running. If startInfraContainer is false then it contains an entry for infraContainerId (mapped to -1).
// It shouldn't be the case where containersToStart is empty and containersToKeep contains only infraContainerId. In such case
// Infra Container should be killed, hence it's removed from this map.
// - all running containers which are NOT contained in containersToKeep should be killed.
type podContainerChangesSpec struct {
startInfraContainer bool
infraContainerId dockertools . DockerID
containersToStart map [ int ] empty
containersToKeep map [ dockertools . DockerID ] int
}
func ( kl * Kubelet ) computePodContainerChanges ( pod * api . BoundPod , containersInPod dockertools . DockerContainers ) ( podContainerChangesSpec , error ) {
2014-07-15 20:24:41 +00:00
podFullName := GetPodFullName ( pod )
2015-01-14 21:53:43 +00:00
uid := pod . UID
2015-03-10 14:09:55 +00:00
glog . V ( 4 ) . Infof ( "Syncing Pod %+v, podFullName: %q, uid: %q" , pod , podFullName , uid )
2014-07-15 20:24:41 +00:00
2015-03-04 15:02:49 +00:00
err := kl . makePodDataDirs ( pod )
2015-02-19 10:08:36 +00:00
if err != nil {
2015-03-10 14:09:55 +00:00
return podContainerChangesSpec { } , err
2015-01-12 00:42:11 +00:00
}
2015-03-10 14:09:55 +00:00
containersToStart := make ( map [ int ] empty )
containersToKeep := make ( map [ dockertools . DockerID ] int )
createPodInfraContainer := false
2015-03-04 15:02:49 +00:00
var podStatus api . PodStatus
podInfraContainerID , found := kl . getPodInfraContainer ( podFullName , uid , containersInPod )
2015-03-10 14:09:55 +00:00
if found {
glog . V ( 4 ) . Infof ( "Found infra pod for %q" , podFullName )
containersToKeep [ podInfraContainerID ] = - 1
2015-03-04 15:02:49 +00:00
podStatus , err = kl . GetPodStatus ( podFullName , uid )
if err != nil {
glog . Errorf ( "Unable to get pod with name %q and uid %q info with error(%v)" , podFullName , uid , err )
}
} else {
2015-03-10 14:09:55 +00:00
glog . V ( 2 ) . Infof ( "No Infra Container for %q found. All containers will be restarted." , podFullName )
createPodInfraContainer = true
2014-07-01 05:27:56 +00:00
}
2014-07-15 20:24:41 +00:00
2015-03-10 14:09:55 +00:00
for index , container := range pod . Spec . Containers {
2014-09-09 04:33:17 +00:00
expectedHash := dockertools . HashContainer ( & container )
2015-03-04 01:33:48 +00:00
if dockerContainer , found , hash := containersInPod . FindPodContainer ( podFullName , uid , container . Name ) ; found {
2014-09-09 04:33:17 +00:00
containerID := dockertools . DockerID ( dockerContainer . ID )
2015-01-06 00:38:47 +00:00
glog . V ( 3 ) . Infof ( "pod %q container %q exists as %v" , podFullName , container . Name , containerID )
2014-07-15 17:26:56 +00:00
2015-03-10 14:09:55 +00:00
if ! createPodInfraContainer {
// look for changes in the container.
containerChanged := hash != 0 && hash != expectedHash
if ! containerChanged {
result , err := kl . probeContainer ( pod , podStatus , container , dockerContainer )
if err != nil {
// TODO(vmarmol): examine this logic.
glog . Infof ( "probe no-error: %s" , container . Name )
containersToKeep [ containerID ] = index
continue
}
if result == probe . Success {
glog . Infof ( "probe success: %s" , container . Name )
containersToKeep [ containerID ] = index
continue
}
glog . Infof ( "pod %q container %q is unhealthy (probe result: %v). Container will be killed and re-created." , podFullName , container . Name , result )
containersToStart [ index ] = empty { }
} else {
glog . Infof ( "pod %q container %q hash changed (%d vs %d). Pod will be killed and re-created." , podFullName , container . Name , hash , expectedHash )
createPodInfraContainer = true
delete ( containersToKeep , podInfraContainerID )
// If we are to restart Infra Container then we move containersToKeep into containersToStart
// if RestartPolicy allows restarting failed containers.
if pod . Spec . RestartPolicy . Never == nil {
for _ , v := range containersToKeep {
containersToStart [ v ] = empty { }
}
}
containersToStart [ index ] = empty { }
containersToKeep = make ( map [ dockertools . DockerID ] int )
2015-02-13 17:23:13 +00:00
}
2015-03-10 14:09:55 +00:00
} else { // createPodInfraContainer == true and Container exists
// If we're creating infra containere everything will be killed anyway
// If RestartPolicy is Always or OnFailure we restart containers that were running before we
// killed them when restarting Infra Container.
if pod . Spec . RestartPolicy . Never == nil {
glog . V ( 1 ) . Infof ( "Infra Container is being recreated. %q will be restarted." , container . Name )
containersToStart [ index ] = empty { }
2015-02-13 03:08:23 +00:00
}
2014-08-26 18:25:17 +00:00
continue
}
2015-03-10 14:09:55 +00:00
} else {
if kl . shouldContainerBeRestarted ( & container , pod ) {
// If we are here it means that the container is dead and sould be restarted, or never existed and should
// be created. We may be inserting this ID again if the container has changed and it has
// RestartPolicy::Always, but it's not a big deal.
glog . V ( 3 ) . Infof ( "Container %+v is dead, but RestartPolicy says that we should restart it." , container )
containersToStart [ index ] = empty { }
}
2014-08-26 18:25:17 +00:00
}
2015-03-10 14:09:55 +00:00
}
2014-08-26 18:25:17 +00:00
2015-03-10 14:09:55 +00:00
// After the loop one of the following should be true:
// - createPodInfraContainer is true and containersToKeep is empty
// - createPodInfraContainer is false and containersToKeep contains at least ID of Infra Container
2015-03-04 15:02:49 +00:00
2015-03-10 14:09:55 +00:00
// If Infra container is the last running one, we don't want to keep it.
if ! createPodInfraContainer && len ( containersToStart ) == 0 && len ( containersToKeep ) == 1 {
containersToKeep = make ( map [ dockertools . DockerID ] int )
}
2015-03-04 15:02:49 +00:00
2015-03-10 14:09:55 +00:00
return podContainerChangesSpec {
startInfraContainer : createPodInfraContainer ,
infraContainerId : podInfraContainerID ,
containersToStart : containersToStart ,
containersToKeep : containersToKeep ,
} , nil
}
func ( kl * Kubelet ) syncPod ( pod * api . BoundPod , containersInPod dockertools . DockerContainers ) error {
podFullName := GetPodFullName ( pod )
uid := pod . UID
containerChanges , err := kl . computePodContainerChanges ( pod , containersInPod )
glog . V ( 3 ) . Infof ( "Got container changes for pod %q: %+v" , podFullName , containerChanges )
if err != nil {
return err
}
if containerChanges . startInfraContainer || ( len ( containerChanges . containersToKeep ) == 0 && len ( containerChanges . containersToStart ) == 0 ) {
if len ( containerChanges . containersToKeep ) == 0 && len ( containerChanges . containersToStart ) == 0 {
glog . V ( 4 ) . Infof ( "Killing Infra Container for %q becase all other containers are dead." , podFullName )
} else {
glog . V ( 4 ) . Infof ( "Killing Infra Container for %q, will start new one" , podFullName )
}
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
if podInfraContainer , found , _ := containersInPod . FindPodContainer ( podFullName , uid , dockertools . PodInfraContainerName ) ; found {
if err := kl . killContainer ( podInfraContainer ) ; err != nil {
glog . Warningf ( "Failed to kill pod infra container %q: %v" , podInfraContainer . ID , err )
}
}
_ , err = kl . killContainersInPod ( pod , containersInPod )
if err != nil {
return err
}
} else {
// Otherwise kill any containers in this pod which are not specified as ones to keep.
for id , container := range containersInPod {
_ , keep := containerChanges . containersToKeep [ id ]
if ! keep {
glog . V ( 3 ) . Infof ( "Killing unwanted container %+v" , container )
err = kl . killContainer ( container )
if err != nil {
glog . Errorf ( "Error killing container: %v" , err )
}
}
2015-03-04 15:02:49 +00:00
}
2015-03-04 01:33:48 +00:00
}
2015-03-10 14:09:55 +00:00
// Starting phase: if we should create infra container then we do it first
var ref * api . ObjectReference
var podVolumes volumeMap
podInfraContainerID := containerChanges . infraContainerId
if containerChanges . startInfraContainer && ( len ( containerChanges . containersToStart ) > 0 ) {
ref , err = api . GetReference ( pod )
if err != nil {
glog . Errorf ( "Couldn't make a ref to pod %q: '%v'" , podFullName , err )
}
glog . Infof ( "Creating pod infra container for %q" , podFullName )
podInfraContainerID , err = kl . createPodInfraContainer ( pod )
2015-03-04 01:33:48 +00:00
if err != nil {
2015-03-10 14:09:55 +00:00
glog . Errorf ( "Failed to create pod infra container: %v; Skipping pod %q" , err , podFullName )
return err
2014-07-18 18:42:47 +00:00
}
2014-06-06 23:40:48 +00:00
}
2014-07-18 18:42:47 +00:00
2015-03-10 14:09:55 +00:00
// Mount volumes
podVolumes , err = kl . mountExternalVolumes ( pod )
if err != nil {
if ref != nil {
kl . recorder . Eventf ( ref , "failedMount" ,
"Unable to mount volumes for pod %q: %v" , podFullName , err )
}
glog . Errorf ( "Unable to mount volumes for pod %q: %v; skipping pod" , podFullName , err )
return err
}
// Start everything
for container := range containerChanges . containersToStart {
glog . V ( 4 ) . Infof ( "Creating container %+v" , pod . Spec . Containers [ container ] )
kl . pullImageAndRunContainer ( pod , & pod . Spec . Containers [ container ] , & podVolumes , podInfraContainerID )
}
2014-07-01 05:27:56 +00:00
return nil
}
2014-07-18 18:42:47 +00:00
type podContainer struct {
podFullName string
2015-01-14 23:22:21 +00:00
uid types . UID
2014-07-18 18:42:47 +00:00
containerName string
}
2014-07-03 01:06:54 +00:00
2014-07-30 21:04:19 +00:00
// Stores all volumes defined by the set of pods into a map.
// Keys for each entry are in the format (POD_ID)/(VOLUME_NAME)
2014-10-08 19:56:02 +00:00
func getDesiredVolumes ( pods [ ] api . BoundPod ) map [ string ] api . Volume {
2014-07-30 21:04:19 +00:00
desiredVolumes := make ( map [ string ] api . Volume )
2014-07-25 20:16:59 +00:00
for _ , pod := range pods {
2014-10-08 19:56:02 +00:00
for _ , volume := range pod . Spec . Volumes {
2014-11-23 15:47:25 +00:00
identifier := path . Join ( string ( pod . UID ) , volume . Name )
2014-07-30 21:04:19 +00:00
desiredVolumes [ identifier ] = volume
2014-07-25 20:16:59 +00:00
}
}
2014-07-30 21:04:19 +00:00
return desiredVolumes
2014-07-25 20:16:59 +00:00
}
2015-01-12 00:42:11 +00:00
func ( kl * Kubelet ) cleanupOrphanedPods ( pods [ ] api . BoundPod ) error {
desired := util . NewStringSet ( )
for i := range pods {
2015-01-14 21:53:43 +00:00
desired . Insert ( string ( pods [ i ] . UID ) )
2015-01-12 00:42:11 +00:00
}
found , err := kl . listPodsFromDisk ( )
if err != nil {
return err
}
errlist := [ ] error { }
for i := range found {
2015-01-14 21:53:43 +00:00
if ! desired . Has ( string ( found [ i ] ) ) {
2015-01-12 00:42:11 +00:00
glog . V ( 3 ) . Infof ( "Orphaned pod %q found, removing" , found [ i ] )
2014-11-23 15:47:25 +00:00
if err := os . RemoveAll ( kl . getPodDir ( found [ i ] ) ) ; err != nil {
2015-01-12 00:42:11 +00:00
errlist = append ( errlist , err )
}
}
}
2015-03-06 07:56:30 +00:00
return utilErrors . NewAggregate ( errlist )
2015-01-12 00:42:11 +00:00
}
2014-07-30 21:04:19 +00:00
// Compares the map of current volumes to the map of desired volumes.
// If an active volume does not have a respective desired volume, clean it up.
2015-02-03 20:14:16 +00:00
func ( kl * Kubelet ) cleanupOrphanedVolumes ( pods [ ] api . BoundPod , running [ ] * docker . Container ) error {
2014-07-30 21:04:19 +00:00
desiredVolumes := getDesiredVolumes ( pods )
2014-11-23 15:47:25 +00:00
currentVolumes := kl . getPodVolumesFromDisk ( )
2015-02-03 20:14:16 +00:00
runningSet := util . StringSet { }
for ix := range running {
2015-02-17 21:28:15 +00:00
if len ( running [ ix ] . Name ) == 0 {
glog . V ( 2 ) . Infof ( "Found running container ix=%d with info: %+v" , ix , running [ ix ] )
}
2015-03-12 23:31:57 +00:00
_ , uid , _ , _ , err := dockertools . ParseDockerName ( running [ ix ] . Name )
if err != nil {
continue
}
2015-02-03 20:14:16 +00:00
runningSet . Insert ( string ( uid ) )
}
2014-07-30 21:04:19 +00:00
for name , vol := range currentVolumes {
if _ , ok := desiredVolumes [ name ] ; ! ok {
2015-02-03 20:14:16 +00:00
parts := strings . Split ( name , "/" )
if runningSet . Has ( parts [ 0 ] ) {
glog . Infof ( "volume %s, still has a container running %s, skipping teardown" , name , parts [ 0 ] )
continue
}
2014-07-30 21:04:19 +00:00
//TODO (jonesdl) We should somehow differentiate between volumes that are supposed
//to be deleted and volumes that are leftover after a crash.
2015-01-06 00:38:47 +00:00
glog . Warningf ( "Orphaned volume %q found, tearing down volume" , name )
2014-07-30 21:04:19 +00:00
//TODO (jonesdl) This should not block other kubelet synchronization procedures
err := vol . TearDown ( )
2014-07-29 17:20:50 +00:00
if err != nil {
2015-01-06 00:38:47 +00:00
glog . Errorf ( "Could not tear down volume %q: %v" , name , err )
2014-07-29 17:20:50 +00:00
}
2014-07-25 20:16:59 +00:00
}
}
return nil
}
2014-07-15 20:24:41 +00:00
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
2015-03-03 18:33:25 +00:00
func ( kl * Kubelet ) SyncPods ( allPods [ ] api . BoundPod , podSyncTypes map [ types . UID ] metrics . SyncPodType , start time . Time ) error {
2015-02-26 17:16:52 +00:00
defer func ( ) {
metrics . SyncPodsLatency . Observe ( metrics . SinceInMicroseconds ( start ) )
} ( )
2015-03-03 18:33:25 +00:00
// Remove obsolete entries in podStatus where the pod is no longer considered bound to this node.
podFullNames := make ( map [ string ] bool )
for _ , pod := range allPods {
podFullNames [ GetPodFullName ( & pod ) ] = true
}
kl . removeOrphanedStatuses ( podFullNames )
// Filtered out the rejected pod. They don't have running containers.
var pods [ ] api . BoundPod
for _ , pod := range allPods {
status , ok := kl . getPodStatusFromCache ( GetPodFullName ( & pod ) )
if ok && status . Phase == api . PodFailed {
continue
}
pods = append ( pods , pod )
}
2014-10-20 04:26:15 +00:00
glog . V ( 4 ) . Infof ( "Desired: %#v" , pods )
2014-07-15 20:24:41 +00:00
var err error
2014-07-18 18:42:47 +00:00
desiredContainers := make ( map [ podContainer ] empty )
2015-01-14 23:22:21 +00:00
desiredPods := make ( map [ types . UID ] empty )
2014-07-01 05:27:56 +00:00
2015-02-19 09:12:53 +00:00
dockerContainers , err := kl . dockerCache . RunningContainers ( )
2014-07-15 17:26:56 +00:00
if err != nil {
2014-10-20 04:26:15 +00:00
glog . Errorf ( "Error listing containers: %#v" , dockerContainers )
2014-07-15 17:26:56 +00:00
return err
}
2014-07-01 05:27:56 +00:00
// Check for any containers that need starting
2014-10-07 04:20:00 +00:00
for ix := range pods {
pod := & pods [ ix ]
podFullName := GetPodFullName ( pod )
2015-01-14 21:53:43 +00:00
uid := pod . UID
desiredPods [ uid ] = empty { }
2014-07-18 18:42:47 +00:00
// Add all containers (including net) to the map.
2015-01-21 00:59:26 +00:00
desiredContainers [ podContainer { podFullName , uid , dockertools . PodInfraContainerName } ] = empty { }
2014-10-08 19:56:02 +00:00
for _ , cont := range pod . Spec . Containers {
2015-01-14 21:53:43 +00:00
desiredContainers [ podContainer { podFullName , uid , cont . Name } ] = empty { }
2014-07-18 18:42:47 +00:00
}
// Run the sync in an async manifest worker.
2015-02-24 23:29:18 +00:00
kl . podWorkers . UpdatePod ( pod , func ( ) {
metrics . SyncPodLatency . WithLabelValues ( podSyncTypes [ pod . UID ] . String ( ) ) . Observe ( metrics . SinceInMicroseconds ( start ) )
} )
2015-02-26 17:25:01 +00:00
// Note the number of containers for new pods.
if val , ok := podSyncTypes [ pod . UID ] ; ok && ( val == metrics . SyncPodCreate ) {
metrics . ContainersPerPodCount . Observe ( float64 ( len ( pod . Spec . Containers ) ) )
}
2014-07-01 16:37:45 +00:00
}
2015-02-27 09:19:41 +00:00
// Stop the workers for no-longer existing pods.
kl . podWorkers . ForgetNonExistingPodWorkers ( desiredPods )
2015-03-05 18:49:36 +00:00
if ! kl . sourcesReady ( ) {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
glog . V ( 4 ) . Infof ( "Skipping deletes, sources aren't ready yet." )
return nil
}
2015-02-26 20:27:14 +00:00
2014-10-20 04:26:15 +00:00
// Kill any containers we don't need.
2015-02-03 20:14:16 +00:00
killed := [ ] string { }
for ix := range dockerContainers {
2014-07-18 18:42:47 +00:00
// Don't kill containers that are in the desired pods.
2015-03-12 23:31:57 +00:00
podFullName , uid , containerName , _ , err := dockertools . ParseDockerName ( dockerContainers [ ix ] . Names [ 0 ] )
_ , found := desiredPods [ uid ]
if err == nil && found {
2014-10-22 04:37:15 +00:00
// syncPod() will handle this one.
continue
}
2015-02-26 20:27:14 +00:00
2015-01-14 21:53:43 +00:00
pc := podContainer { podFullName , uid , containerName }
2015-03-12 23:31:57 +00:00
_ , ok := desiredContainers [ pc ]
if err != nil || ! ok {
2014-10-20 04:26:15 +00:00
glog . V ( 1 ) . Infof ( "Killing unwanted container %+v" , pc )
2015-02-03 20:14:16 +00:00
err = kl . killContainer ( dockerContainers [ ix ] )
2014-06-06 23:40:48 +00:00
if err != nil {
2014-11-20 10:00:36 +00:00
glog . Errorf ( "Error killing container %+v: %v" , pc , err )
2015-02-03 20:14:16 +00:00
} else {
killed = append ( killed , dockerContainers [ ix ] . ID )
2014-06-06 23:40:48 +00:00
}
}
}
2014-07-30 21:04:19 +00:00
2015-02-03 20:14:16 +00:00
running , err := dockertools . GetRunningContainers ( kl . dockerClient , killed )
if err != nil {
glog . Errorf ( "Failed to poll container state: %v" , err )
return err
}
2015-02-04 01:46:28 +00:00
// Remove any orphaned volumes.
2015-02-03 20:14:16 +00:00
err = kl . cleanupOrphanedVolumes ( pods , running )
2015-01-12 00:42:11 +00:00
if err != nil {
return err
}
2015-02-04 01:46:28 +00:00
// Remove any orphaned pods.
err = kl . cleanupOrphanedPods ( pods )
2015-01-12 00:42:11 +00:00
if err != nil {
return err
}
2014-07-30 21:04:19 +00:00
2014-06-06 23:40:48 +00:00
return err
}
2014-10-30 17:45:22 +00:00
func updateBoundPods ( changed [ ] api . BoundPod , current [ ] api . BoundPod ) [ ] api . BoundPod {
updated := [ ] api . BoundPod { }
2015-01-14 23:22:21 +00:00
m := map [ types . UID ] * api . BoundPod { }
2014-10-30 17:45:22 +00:00
for i := range changed {
pod := & changed [ i ]
m [ pod . UID ] = pod
}
for i := range current {
pod := & current [ i ]
if m [ pod . UID ] != nil {
updated = append ( updated , * m [ pod . UID ] )
2015-01-06 00:38:47 +00:00
glog . V ( 4 ) . Infof ( "pod with UID: %q has a new spec %+v" , pod . UID , * m [ pod . UID ] )
2014-10-30 17:45:22 +00:00
} else {
updated = append ( updated , * pod )
2015-01-06 00:38:47 +00:00
glog . V ( 4 ) . Infof ( "pod with UID: %q stay with the same spec %+v" , pod . UID , * pod )
2014-10-30 17:45:22 +00:00
}
}
return updated
}
2015-02-27 21:43:21 +00:00
type podsByCreationTime [ ] api . BoundPod
func ( s podsByCreationTime ) Len ( ) int {
return len ( s )
}
func ( s podsByCreationTime ) Swap ( i , j int ) {
s [ i ] , s [ j ] = s [ j ] , s [ i ]
}
func ( s podsByCreationTime ) Less ( i , j int ) bool {
return s [ i ] . CreationTimestamp . Before ( s [ j ] . CreationTimestamp )
}
2015-03-03 18:33:25 +00:00
// getHostPortConflicts detects pods with conflicted host ports and return them.
func getHostPortConflicts ( pods [ ] api . BoundPod ) [ ] api . BoundPod {
conflicts := [ ] api . BoundPod { }
2014-07-15 20:24:41 +00:00
ports := map [ int ] bool { }
2015-02-23 22:25:56 +00:00
extract := func ( p * api . ContainerPort ) int { return p . HostPort }
2015-02-27 21:43:21 +00:00
// Respect the pod creation order when resolving conflicts.
sort . Sort ( podsByCreationTime ( pods ) )
2014-07-15 20:24:41 +00:00
for i := range pods {
pod := & pods [ i ]
2014-10-08 19:56:02 +00:00
if errs := validation . AccumulateUniquePorts ( pod . Spec . Containers , ports , extract ) ; len ( errs ) != 0 {
2015-03-03 18:33:25 +00:00
glog . Errorf ( "Pod %q: HostPort is already allocated, ignoring: %v" , GetPodFullName ( pod ) , errs )
conflicts = append ( conflicts , * pod )
2014-07-15 20:24:41 +00:00
continue
2014-07-08 04:48:47 +00:00
}
}
2014-07-15 20:24:41 +00:00
2015-03-03 18:33:25 +00:00
return conflicts
}
// handleHostPortConflicts handles pods that conflict on Port.HostPort values.
func ( kl * Kubelet ) handleHostPortConflicts ( pods [ ] api . BoundPod ) {
conflicts := getHostPortConflicts ( pods )
for _ , pod := range conflicts {
kl . recorder . Eventf ( & pod , "hostPortConflict" , "Cannot start the pod due to host port conflict." )
kl . setPodStatusInCache ( GetPodFullName ( & pod ) , api . PodStatus {
Phase : api . PodFailed ,
Message : "Pod cannot be started due to host port conflict" } )
}
2014-07-08 04:48:47 +00:00
}
2014-07-01 20:01:39 +00:00
// syncLoop is the main loop for processing changes. It watches for changes from
2015-03-11 23:40:20 +00:00
// three channels (file, apiserver, and http) and creates a union of them. For
2014-06-06 23:40:48 +00:00
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
2014-07-15 20:24:41 +00:00
// state every sync_frequency seconds. Never returns.
func ( kl * Kubelet ) syncLoop ( updates <- chan PodUpdate , handler SyncHandler ) {
2014-06-06 23:40:48 +00:00
for {
2015-02-18 15:08:32 +00:00
unsyncedPod := false
2015-02-24 23:29:18 +00:00
podSyncTypes := make ( map [ types . UID ] metrics . SyncPodType )
2014-06-06 23:40:48 +00:00
select {
2014-07-15 20:24:41 +00:00
case u := <- updates :
2015-02-24 23:29:18 +00:00
kl . updatePods ( u , podSyncTypes )
2015-02-18 15:08:32 +00:00
unsyncedPod = true
2014-08-06 20:12:19 +00:00
case <- time . After ( kl . resyncInterval ) :
2014-10-20 04:26:15 +00:00
glog . V ( 4 ) . Infof ( "Periodic sync" )
2014-06-21 21:20:35 +00:00
}
2015-02-24 23:29:18 +00:00
start := time . Now ( )
2015-02-18 15:08:32 +00:00
// If we already caught some update, try to wait for some short time
// to possibly batch it with other incoming updates.
2015-02-19 07:01:16 +00:00
for unsyncedPod {
2015-02-18 15:08:32 +00:00
select {
2015-02-19 07:01:16 +00:00
case u := <- updates :
2015-02-24 23:29:18 +00:00
kl . updatePods ( u , podSyncTypes )
2015-02-19 07:01:16 +00:00
case <- time . After ( 5 * time . Millisecond ) :
// Break the for loop.
unsyncedPod = false
2015-02-18 15:08:32 +00:00
}
}
2014-06-20 16:31:18 +00:00
2015-02-17 18:53:04 +00:00
pods , err := kl . GetBoundPods ( )
2014-06-06 23:40:48 +00:00
if err != nil {
2015-02-17 18:53:04 +00:00
glog . Errorf ( "Failed to get bound pods." )
return
}
2015-02-24 23:29:18 +00:00
if err := handler . SyncPods ( pods , podSyncTypes , start ) ; err != nil {
2014-11-20 10:00:36 +00:00
glog . Errorf ( "Couldn't sync containers: %v" , err )
2014-06-06 23:40:48 +00:00
}
}
}
2015-02-24 23:29:18 +00:00
// Updated the Kubelet's internal pods with those provided by the update.
// Records new and updated pods in newPods and updatedPods.
func ( kl * Kubelet ) updatePods ( u PodUpdate , podSyncTypes map [ types . UID ] metrics . SyncPodType ) {
2015-03-13 17:31:26 +00:00
kl . podLock . Lock ( )
defer kl . podLock . Unlock ( )
2015-02-18 15:08:32 +00:00
switch u . Op {
2015-02-19 07:01:16 +00:00
case SET :
glog . V ( 3 ) . Infof ( "SET: Containers changed" )
2015-02-24 23:29:18 +00:00
// Store the new pods. Don't worry about filtering host ports since those
// pods will never be looked up.
existingPods := make ( map [ types . UID ] struct { } )
for i := range kl . pods {
existingPods [ kl . pods [ i ] . UID ] = struct { } { }
}
for i := range u . Pods {
if _ , ok := existingPods [ u . Pods [ i ] . UID ] ; ! ok {
podSyncTypes [ u . Pods [ i ] . UID ] = metrics . SyncPodCreate
}
}
2015-02-19 07:01:16 +00:00
kl . pods = u . Pods
2015-03-03 18:33:25 +00:00
kl . handleHostPortConflicts ( kl . pods )
2015-02-19 07:01:16 +00:00
case UPDATE :
glog . V ( 3 ) . Infof ( "Update: Containers changed" )
2015-02-24 23:29:18 +00:00
// Store the updated pods. Don't worry about filtering host ports since those
// pods will never be looked up.
for i := range u . Pods {
podSyncTypes [ u . Pods [ i ] . UID ] = metrics . SyncPodUpdate
}
2015-02-19 07:01:16 +00:00
kl . pods = updateBoundPods ( u . Pods , kl . pods )
2015-03-03 18:33:25 +00:00
kl . handleHostPortConflicts ( kl . pods )
2015-02-19 07:01:16 +00:00
default :
panic ( "syncLoop does not support incremental changes" )
2015-02-18 15:08:32 +00:00
}
2015-02-24 23:29:18 +00:00
// Mark all remaining pods as sync.
for i := range kl . pods {
if _ , ok := podSyncTypes [ kl . pods [ i ] . UID ] ; ! ok {
podSyncTypes [ u . Pods [ i ] . UID ] = metrics . SyncPodSync
}
}
2015-02-18 15:08:32 +00:00
}
2015-02-04 17:14:17 +00:00
// Returns Docker version for this Kubelet.
2015-02-04 19:50:21 +00:00
func ( kl * Kubelet ) GetDockerVersion ( ) ( [ ] uint , error ) {
2015-02-04 17:14:17 +00:00
if kl . dockerClient == nil {
2015-02-04 19:50:21 +00:00
return nil , fmt . Errorf ( "no Docker client" )
2015-02-04 17:14:17 +00:00
}
2015-02-04 19:50:21 +00:00
dockerRunner := dockertools . NewDockerContainerCommandRunner ( kl . dockerClient )
return dockerRunner . GetDockerServerVersion ( )
2015-02-04 17:14:17 +00:00
}
2015-02-24 00:33:43 +00:00
func ( kl * Kubelet ) validatePodPhase ( podStatus * api . PodStatus ) error {
switch podStatus . Phase {
case api . PodRunning , api . PodSucceeded , api . PodFailed :
return nil
}
return fmt . Errorf ( "pod is not in 'Running', 'Succeeded' or 'Failed' state - State: %q" , podStatus . Phase )
}
func ( kl * Kubelet ) validateContainerStatus ( podStatus * api . PodStatus , containerName string ) ( dockerID string , err error ) {
for cName , cStatus := range podStatus . Info {
if containerName == cName {
if cStatus . State . Waiting != nil {
return "" , fmt . Errorf ( "container %q is in waiting state." , containerName )
}
return strings . Replace ( podStatus . Info [ containerName ] . ContainerID , dockertools . DockerPrefix , "" , 1 ) , nil
}
}
return "" , fmt . Errorf ( "container %q not found in pod" , containerName )
}
2014-08-27 19:41:32 +00:00
// GetKubeletContainerLogs returns logs from the container
2015-01-14 02:11:24 +00:00
// The second parameter of GetPodStatus and FindPodContainer methods represents pod UUID, which is allowed to be blank
2015-01-07 15:18:56 +00:00
// TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt
// or all of them.
2014-09-22 20:14:23 +00:00
func ( kl * Kubelet ) GetKubeletContainerLogs ( podFullName , containerName , tail string , follow bool , stdout , stderr io . Writer ) error {
2015-02-12 01:03:59 +00:00
podStatus , err := kl . GetPodStatus ( podFullName , "" )
2014-09-17 19:00:09 +00:00
if err != nil {
2015-02-12 01:03:59 +00:00
if err == dockertools . ErrNoContainersInPod {
return fmt . Errorf ( "pod %q not found\n" , podFullName )
} else {
return fmt . Errorf ( "failed to get status for pod %q - %v" , podFullName , err )
}
2014-09-17 19:00:09 +00:00
}
2015-02-24 00:33:43 +00:00
if err := kl . validatePodPhase ( & podStatus ) ; err != nil {
return err
2015-02-12 01:03:59 +00:00
}
2015-02-24 00:33:43 +00:00
dockerContainerID , err := kl . validateContainerStatus ( & podStatus , containerName )
if err != nil {
return err
2015-02-12 01:03:59 +00:00
}
return dockertools . GetKubeletDockerContainerLogs ( kl . dockerClient , dockerContainerID , tail , follow , stdout , stderr )
2014-08-27 19:41:32 +00:00
}
2015-02-09 16:40:42 +00:00
// GetHostname Returns the hostname as the kubelet sees it.
func ( kl * Kubelet ) GetHostname ( ) string {
return kl . hostname
}
2015-02-23 21:04:45 +00:00
// GetBoundPods returns all pods bound to the kubelet and their spec.
2014-10-22 23:52:38 +00:00
func ( kl * Kubelet ) GetBoundPods ( ) ( [ ] api . BoundPod , error ) {
2015-02-17 18:53:04 +00:00
kl . podLock . RLock ( )
defer kl . podLock . RUnlock ( )
return append ( [ ] api . BoundPod { } , kl . pods ... ) , nil
2014-10-22 23:52:38 +00:00
}
2015-02-17 18:53:04 +00:00
// GetPodByName provides the first pod that matches namespace and name, as well as whether the node was found.
2015-01-07 15:18:56 +00:00
func ( kl * Kubelet ) GetPodByName ( namespace , name string ) ( * api . BoundPod , bool ) {
2015-02-17 18:53:04 +00:00
kl . podLock . RLock ( )
defer kl . podLock . RUnlock ( )
2015-01-07 15:18:56 +00:00
for i := range kl . pods {
2015-02-17 18:53:04 +00:00
pod := kl . pods [ i ]
2015-01-07 15:18:56 +00:00
if pod . Namespace == namespace && pod . Name == name {
2015-02-17 18:53:04 +00:00
return & pod , true
2015-01-07 15:18:56 +00:00
}
}
return nil , false
}
2015-02-23 21:04:45 +00:00
// updateNodeStatus updates node status to master with retries.
func ( kl * Kubelet ) updateNodeStatus ( ) error {
for i := 0 ; i < nodeStatusUpdateRetry ; i ++ {
err := kl . tryUpdateNodeStatus ( )
if err != nil {
glog . Errorf ( "error updating node status, will retry: %v" , err )
} else {
return nil
}
}
return fmt . Errorf ( "Update node status exceeds retry count" )
}
// tryUpdateNodeStatus tries to update node status to master.
func ( kl * Kubelet ) tryUpdateNodeStatus ( ) error {
node , err := kl . kubeClient . Nodes ( ) . Get ( kl . hostname )
if err != nil {
return fmt . Errorf ( "error getting node %s: %v" , kl . hostname , err )
}
if node == nil {
return fmt . Errorf ( "no node instance returned for %v" , kl . hostname )
}
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info , err := kl . GetMachineInfo ( )
if err != nil {
glog . Error ( "error getting machine info: %v" , err )
} else {
node . Status . NodeInfo . MachineID = info . MachineID
node . Status . NodeInfo . SystemUUID = info . SystemUUID
node . Spec . Capacity = api . ResourceList {
api . ResourceCPU : * resource . NewMilliQuantity (
int64 ( info . NumCores * 1000 ) ,
resource . DecimalSI ) ,
api . ResourceMemory : * resource . NewQuantity (
info . MemoryCapacity ,
resource . BinarySI ) ,
}
}
newCondition := api . NodeCondition {
Type : api . NodeReady ,
Status : api . ConditionFull ,
Reason : fmt . Sprintf ( "kubelet is posting ready status" ) ,
LastProbeTime : util . Now ( ) ,
}
updated := false
for i := range node . Status . Conditions {
if node . Status . Conditions [ i ] . Type == api . NodeReady {
node . Status . Conditions [ i ] = newCondition
updated = true
}
}
if ! updated {
node . Status . Conditions = append ( node . Status . Conditions , newCondition )
}
_ , err = kl . kubeClient . Nodes ( ) . Update ( node )
return err
}
2015-01-28 17:56:35 +00:00
// getPhase returns the phase of a pod given its container info.
func getPhase ( spec * api . PodSpec , info api . PodInfo ) api . PodPhase {
running := 0
waiting := 0
stopped := 0
failed := 0
succeeded := 0
unknown := 0
for _ , container := range spec . Containers {
if containerStatus , ok := info [ container . Name ] ; ok {
if containerStatus . State . Running != nil {
running ++
} else if containerStatus . State . Termination != nil {
stopped ++
if containerStatus . State . Termination . ExitCode == 0 {
succeeded ++
} else {
failed ++
}
} else if containerStatus . State . Waiting != nil {
waiting ++
} else {
unknown ++
}
} else {
unknown ++
}
}
switch {
case waiting > 0 :
2015-02-09 21:55:36 +00:00
glog . V ( 5 ) . Infof ( "pod waiting > 0, pending" )
2015-01-28 17:56:35 +00:00
// One or more containers has not been started
return api . PodPending
case running > 0 && unknown == 0 :
// All containers have been started, and at least
// one container is running
return api . PodRunning
case running == 0 && stopped > 0 && unknown == 0 :
// All containers are terminated
if spec . RestartPolicy . Always != nil {
// All containers are in the process of restarting
return api . PodRunning
}
if stopped == succeeded {
// RestartPolicy is not Always, and all
// containers are terminated in success
return api . PodSucceeded
}
if spec . RestartPolicy . Never != nil {
// RestartPolicy is Never, and all containers are
// terminated with at least one in failure
return api . PodFailed
}
// RestartPolicy is OnFailure, and at least one in failure
// and in the process of restarting
return api . PodRunning
default :
2015-02-09 21:55:36 +00:00
glog . V ( 5 ) . Infof ( "pod default case, pending" )
2015-01-28 17:56:35 +00:00
return api . PodPending
}
}
2015-02-02 18:51:52 +00:00
// getPodReadyCondition returns ready condition if all containers in a pod are ready, else it returns an unready condition.
func getPodReadyCondition ( spec * api . PodSpec , info api . PodInfo ) [ ] api . PodCondition {
ready := [ ] api . PodCondition { {
2015-02-24 05:21:14 +00:00
Type : api . PodReady ,
2015-02-02 18:51:52 +00:00
Status : api . ConditionFull ,
} }
unready := [ ] api . PodCondition { {
2015-02-24 05:21:14 +00:00
Type : api . PodReady ,
2015-02-02 18:51:52 +00:00
Status : api . ConditionNone ,
} }
if info == nil {
return unready
}
for _ , container := range spec . Containers {
if containerStatus , ok := info [ container . Name ] ; ok {
if ! containerStatus . Ready {
return unready
}
} else {
return unready
}
}
return ready
}
2015-02-17 18:53:04 +00:00
func ( kl * Kubelet ) GetPodByFullName ( podFullName string ) ( * api . PodSpec , bool ) {
kl . podLock . RLock ( )
defer kl . podLock . RUnlock ( )
2014-10-03 06:39:02 +00:00
for _ , pod := range kl . pods {
2014-10-06 18:54:51 +00:00
if GetPodFullName ( & pod ) == podFullName {
2015-02-17 18:53:04 +00:00
return & pod . Spec , true
2014-10-03 06:39:02 +00:00
}
}
2015-02-17 18:53:04 +00:00
return nil , false
}
// GetPodStatus returns information from Docker about the containers in a pod
func ( kl * Kubelet ) GetPodStatus ( podFullName string , uid types . UID ) ( api . PodStatus , error ) {
var podStatus api . PodStatus
spec , found := kl . GetPodByFullName ( podFullName )
2015-02-09 21:55:36 +00:00
if ! found {
return podStatus , fmt . Errorf ( "Couldn't find spec for pod %s" , podFullName )
}
2015-01-14 02:11:24 +00:00
2015-03-03 18:33:25 +00:00
// Check to see if the pod has been rejected.
mappedPodStatus , ok := kl . getPodStatusFromCache ( podFullName )
if ok {
return mappedPodStatus , nil
}
2015-02-17 18:53:04 +00:00
info , err := dockertools . GetDockerPodInfo ( kl . dockerClient , * spec , podFullName , uid )
2015-01-14 02:11:24 +00:00
2015-02-09 21:55:36 +00:00
if err != nil {
2015-02-09 21:55:36 +00:00
// Error handling
glog . Infof ( "Query docker container info for pod %s failed with error (%v)" , podFullName , err )
if strings . Contains ( err . Error ( ) , "resource temporarily unavailable" ) {
// Leave upstream layer to decide what to do
return podStatus , err
} else {
podStatus . Phase = api . PodPending
podStatus . Message = fmt . Sprintf ( "Query docker container info failed with error (%v)" , err )
return podStatus , nil
}
2015-02-09 21:55:36 +00:00
}
2015-02-09 21:55:36 +00:00
// Assume info is ready to process
2015-02-17 18:53:04 +00:00
podStatus . Phase = getPhase ( spec , info )
2015-02-27 02:02:04 +00:00
podStatus . Info = api . PodInfo { }
2015-02-02 18:51:52 +00:00
for _ , c := range spec . Containers {
containerStatus := info [ c . Name ]
containerStatus . Ready = kl . readiness . IsReady ( containerStatus )
2015-02-27 02:02:04 +00:00
podStatus . Info [ c . Name ] = containerStatus
2015-02-02 18:51:52 +00:00
}
2015-02-27 02:02:04 +00:00
podStatus . Conditions = append ( podStatus . Conditions , getPodReadyCondition ( spec , podStatus . Info ) ... )
2015-02-09 21:55:36 +00:00
2015-01-28 17:56:35 +00:00
netContainerInfo , found := info [ dockertools . PodInfraContainerName ]
if found {
podStatus . PodIP = netContainerInfo . PodIP
}
2015-02-09 21:55:36 +00:00
return podStatus , nil
2014-07-15 17:26:56 +00:00
}
2014-07-15 07:04:30 +00:00
// Returns logs of current machine.
func ( kl * Kubelet ) ServeLogs ( w http . ResponseWriter , req * http . Request ) {
// TODO: whitelist logs we are willing to serve
2014-07-22 21:40:59 +00:00
kl . logServer . ServeHTTP ( w , req )
2014-07-15 07:04:30 +00:00
}
2014-08-07 18:15:11 +00:00
// Run a command in a container, returns the combined stdout, stderr as an array of bytes
2015-01-14 23:22:21 +00:00
func ( kl * Kubelet ) RunInContainer ( podFullName string , uid types . UID , container string , cmd [ ] string ) ( [ ] byte , error ) {
2014-08-07 18:15:11 +00:00
if kl . runner == nil {
return nil , fmt . Errorf ( "no runner specified." )
}
2014-09-29 21:38:31 +00:00
dockerContainers , err := dockertools . GetKubeletDockerContainers ( kl . dockerClient , false )
2014-08-07 18:15:11 +00:00
if err != nil {
return nil , err
}
2015-01-14 21:53:43 +00:00
dockerContainer , found , _ := dockerContainers . FindPodContainer ( podFullName , uid , container )
2014-08-07 18:15:11 +00:00
if ! found {
2015-01-06 00:38:47 +00:00
return nil , fmt . Errorf ( "container not found (%q)" , container )
2014-08-07 18:15:11 +00:00
}
return kl . runner . RunInContainer ( dockerContainer . ID , cmd )
}
2014-11-10 21:13:57 +00:00
2015-01-08 20:41:38 +00:00
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func ( kl * Kubelet ) ExecInContainer ( podFullName string , uid types . UID , container string , cmd [ ] string , stdin io . Reader , stdout , stderr io . WriteCloser , tty bool ) error {
if kl . runner == nil {
return fmt . Errorf ( "no runner specified." )
}
dockerContainers , err := dockertools . GetKubeletDockerContainers ( kl . dockerClient , false )
if err != nil {
return err
}
dockerContainer , found , _ := dockerContainers . FindPodContainer ( podFullName , uid , container )
if ! found {
return fmt . Errorf ( "container not found (%q)" , container )
}
return kl . runner . ExecInContainer ( dockerContainer . ID , cmd , stdin , stdout , stderr , tty )
}
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func ( kl * Kubelet ) PortForward ( podFullName string , uid types . UID , port uint16 , stream io . ReadWriteCloser ) error {
if kl . runner == nil {
return fmt . Errorf ( "no runner specified." )
}
dockerContainers , err := dockertools . GetKubeletDockerContainers ( kl . dockerClient , false )
if err != nil {
return err
}
podInfraContainer , found , _ := dockerContainers . FindPodContainer ( podFullName , uid , dockertools . PodInfraContainerName )
if ! found {
return fmt . Errorf ( "Unable to find pod infra container for pod %s, uid %v" , podFullName , uid )
}
return kl . runner . PortForward ( podInfraContainer . ID , port , stream )
}
2014-11-10 21:13:57 +00:00
// BirthCry sends an event that the kubelet has started up.
func ( kl * Kubelet ) BirthCry ( ) {
// Make an event that kubelet restarted.
// TODO: get the real minion object of ourself,
// and use the real minion name and UID.
ref := & api . ObjectReference {
2014-11-17 23:34:07 +00:00
Kind : "Minion" ,
Name : kl . hostname ,
2015-01-14 23:22:21 +00:00
UID : types . UID ( kl . hostname ) ,
2014-11-17 23:34:07 +00:00
Namespace : api . NamespaceDefault ,
2014-11-10 21:13:57 +00:00
}
2015-03-03 06:06:20 +00:00
kl . recorder . Eventf ( ref , "starting" , "Starting kubelet." )
2014-11-10 21:13:57 +00:00
}
2015-01-08 20:41:38 +00:00
func ( kl * Kubelet ) StreamingConnectionIdleTimeout ( ) time . Duration {
return kl . streamingConnectionIdleTimeout
}
2015-03-06 07:56:30 +00:00
// GetContainerInfo returns stats (from Cadvisor) for a container.
func ( kl * Kubelet ) GetContainerInfo ( podFullName string , uid types . UID , containerName string , req * cadvisorApi . ContainerInfoRequest ) ( * cadvisorApi . ContainerInfo , error ) {
dockerContainers , err := dockertools . GetKubeletDockerContainers ( kl . dockerClient , false )
if err != nil {
return nil , err
}
if len ( dockerContainers ) == 0 {
return nil , ErrNoKubeletContainers
}
dockerContainer , found , _ := dockerContainers . FindPodContainer ( podFullName , uid , containerName )
if ! found {
return nil , ErrContainerNotFound
}
ci , err := kl . cadvisor . DockerContainer ( dockerContainer . ID , req )
if err != nil {
return nil , err
}
return & ci , nil
}
// GetRootInfo returns stats (from Cadvisor) of current machine (root container).
func ( kl * Kubelet ) GetRootInfo ( req * cadvisorApi . ContainerInfoRequest ) ( * cadvisorApi . ContainerInfo , error ) {
return kl . cadvisor . ContainerInfo ( "/" , req )
}
func ( kl * Kubelet ) GetMachineInfo ( ) ( * cadvisorApi . MachineInfo , error ) {
return kl . cadvisor . MachineInfo ( )
}