2014-06-06 23:40:48 +00:00
/ *
2015-05-01 16:19:44 +00:00
Copyright 2015 The Kubernetes Authors All rights reserved .
2014-06-06 23:40:48 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package kubelet
2015-06-10 20:31:22 +00:00
// Note: if you change code in this file, you might need to change code in
// contrib/mesos/pkg/executor/.
2014-06-06 23:40:48 +00:00
import (
2015-03-06 07:56:30 +00:00
"errors"
2014-06-06 23:40:48 +00:00
"fmt"
2014-09-24 21:27:10 +00:00
"io"
2014-11-12 05:21:40 +00:00
"io/ioutil"
"net"
2014-06-06 23:40:48 +00:00
"net/http"
2014-11-07 06:41:16 +00:00
"os"
2014-07-29 17:20:50 +00:00
"path"
2014-10-28 00:29:55 +00:00
"sort"
2014-06-06 23:40:48 +00:00
"strings"
2015-05-05 10:19:54 +00:00
"sync"
2014-06-06 23:40:48 +00:00
"time"
2015-08-05 22:05:17 +00:00
"github.com/golang/glog"
2015-08-04 00:28:33 +00:00
cadvisorApi "github.com/google/cadvisor/info/v1"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/validation"
2015-08-13 19:01:50 +00:00
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/cache"
"k8s.io/kubernetes/pkg/client/unversioned/record"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/fieldpath"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/rkt"
kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeletUtil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
utilErrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/mount"
nodeutil "k8s.io/kubernetes/pkg/util/node"
2015-08-04 00:28:33 +00:00
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/third_party/golang/expansion"
2014-06-06 23:40:48 +00:00
)
2015-03-06 00:37:08 +00:00
const (
2015-05-04 20:14:55 +00:00
// Max amount of time to wait for the container runtime to come up.
maxWaitForContainerRuntime = 5 * time . Minute
2015-02-23 21:04:45 +00:00
2015-03-23 01:10:35 +00:00
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5
2015-04-27 20:03:55 +00:00
// Location of container logs.
containerLogsDir = "/var/log/containers"
2015-08-13 12:59:15 +00:00
// max backoff period
maxContainerBackOff = 300 * time . Second
2015-03-06 00:37:08 +00:00
)
2015-02-20 03:17:44 +00:00
2015-03-06 07:56:30 +00:00
var (
2015-03-09 22:46:47 +00:00
// ErrContainerNotFound returned when a container in the given pod with the
// given container name was not found, amongst those managed by the kubelet.
2015-03-06 07:56:30 +00:00
ErrContainerNotFound = errors . New ( "no matching container" )
)
2014-07-15 20:24:41 +00:00
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
2015-02-24 23:29:18 +00:00
// Syncs current state to match the specified pods. SyncPodType specified what
2015-07-03 20:29:14 +00:00
// type of sync is occurring per pod. StartTime specifies the time at which
2015-02-24 23:29:18 +00:00
// syncing began (for use in monitoring).
2015-06-10 00:50:15 +00:00
SyncPods ( pods [ ] * api . Pod , podSyncTypes map [ types . UID ] SyncPodType , mirrorPods map [ string ] * api . Pod ,
2015-03-09 22:46:47 +00:00
startTime time . Time ) error
2014-07-15 20:24:41 +00:00
}
2015-03-05 18:49:36 +00:00
type SourcesReadyFn func ( ) bool
2014-12-17 05:11:27 +00:00
2015-05-04 20:14:55 +00:00
// Wait for the container runtime to be up with a timeout.
func waitUntilRuntimeIsUp ( cr kubecontainer . Runtime , timeout time . Duration ) error {
var err error = nil
waitStart := time . Now ( )
for time . Since ( waitStart ) < timeout {
_ , err = cr . Version ( )
if err == nil {
return nil
}
time . Sleep ( 100 * time . Millisecond )
}
return err
}
2014-07-22 21:40:59 +00:00
// New creates a new Kubelet for use in main
func NewMainKubelet (
2015-01-07 02:31:40 +00:00
hostname string ,
2015-06-12 15:40:34 +00:00
nodeName string ,
2015-01-07 02:31:40 +00:00
dockerClient dockertools . DockerInterface ,
2015-02-27 18:44:44 +00:00
kubeClient client . Interface ,
2015-01-07 02:31:40 +00:00
rootDirectory string ,
2015-01-21 00:59:26 +00:00
podInfraContainerImage string ,
2015-01-07 02:31:40 +00:00
resyncInterval time . Duration ,
2014-09-26 04:24:44 +00:00
pullQPS float32 ,
2014-10-28 00:29:55 +00:00
pullBurst int ,
2015-03-14 17:13:20 +00:00
containerGCPolicy ContainerGCPolicy ,
2015-03-05 18:49:36 +00:00
sourcesReady SourcesReadyFn ,
2015-05-20 21:21:03 +00:00
registerNode bool ,
2015-06-12 17:20:26 +00:00
standaloneMode bool ,
2014-11-12 05:21:40 +00:00
clusterDomain string ,
2015-01-08 15:25:14 +00:00
clusterDNS net . IP ,
2014-11-23 15:47:25 +00:00
masterServiceNamespace string ,
2015-03-19 05:18:31 +00:00
volumePlugins [ ] volume . VolumePlugin ,
2015-03-19 23:14:13 +00:00
networkPlugins [ ] network . NetworkPlugin ,
networkPluginName string ,
2015-03-03 06:06:20 +00:00
streamingConnectionIdleTimeout time . Duration ,
2015-03-06 07:56:30 +00:00
recorder record . EventRecorder ,
2015-02-23 21:04:45 +00:00
cadvisorInterface cadvisor . Interface ,
2015-03-23 22:31:13 +00:00
imageGCPolicy ImageGCPolicy ,
2015-05-12 08:24:08 +00:00
diskSpacePolicy DiskSpacePolicy ,
2015-03-31 11:17:12 +00:00
cloud cloudprovider . Interface ,
2015-04-14 00:30:57 +00:00
nodeStatusUpdateFrequency time . Duration ,
2015-04-21 00:26:40 +00:00
resourceContainer string ,
2015-04-24 00:07:52 +00:00
osInterface kubecontainer . OSInterface ,
2015-05-01 21:24:07 +00:00
cgroupRoot string ,
2015-05-04 14:43:10 +00:00
containerRuntime string ,
2015-08-17 17:03:45 +00:00
rktPath string ,
2015-05-12 16:59:02 +00:00
mounter mount . Interface ,
2015-05-11 21:07:24 +00:00
dockerDaemonContainer string ,
2015-05-19 23:19:12 +00:00
systemContainer string ,
2015-03-17 14:43:49 +00:00
configureCBR0 bool ,
2015-06-24 18:10:10 +00:00
podCIDR string ,
2015-05-27 12:51:01 +00:00
pods int ,
dockerExecHandler dockertools . ExecHandler ) ( * Kubelet , error ) {
2015-01-12 00:42:11 +00:00
if rootDirectory == "" {
return nil , fmt . Errorf ( "invalid root directory %q" , rootDirectory )
}
2015-01-07 02:31:40 +00:00
if resyncInterval <= 0 {
return nil , fmt . Errorf ( "invalid sync frequency %d" , resyncInterval )
}
2015-05-19 22:52:12 +00:00
if systemContainer != "" && cgroupRoot == "" {
return nil , fmt . Errorf ( "invalid configuration: system container was specified and cgroup root was not specified" )
}
2015-04-25 04:57:19 +00:00
dockerClient = dockertools . NewInstrumentedDockerInterface ( dockerClient )
2015-03-06 00:37:08 +00:00
2015-01-26 21:44:53 +00:00
serviceStore := cache . NewStore ( cache . MetaNamespaceKeyFunc )
2015-01-16 21:39:31 +00:00
if kubeClient != nil {
2015-02-27 18:44:44 +00:00
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
listWatch := & cache . ListWatch {
ListFunc : func ( ) ( runtime . Object , error ) {
return kubeClient . Services ( api . NamespaceAll ) . List ( labels . Everything ( ) )
} ,
WatchFunc : func ( resourceVersion string ) ( watch . Interface , error ) {
2015-03-15 21:51:41 +00:00
return kubeClient . Services ( api . NamespaceAll ) . Watch ( labels . Everything ( ) , fields . Everything ( ) , resourceVersion )
2015-02-27 18:44:44 +00:00
} ,
}
cache . NewReflector ( listWatch , & api . Service { } , serviceStore , 0 ) . Run ( )
2015-01-16 21:39:31 +00:00
}
2015-08-08 01:52:23 +00:00
serviceLister := & cache . StoreToServiceLister { Store : serviceStore }
2015-01-08 15:25:14 +00:00
2015-03-24 20:13:25 +00:00
nodeStore := cache . NewStore ( cache . MetaNamespaceKeyFunc )
2015-03-20 16:52:32 +00:00
if kubeClient != nil {
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
2015-06-12 15:40:34 +00:00
fieldSelector := fields . Set { client . ObjectNameField : nodeName } . AsSelector ( )
2015-03-20 16:52:32 +00:00
listWatch := & cache . ListWatch {
ListFunc : func ( ) ( runtime . Object , error ) {
2015-04-10 10:08:36 +00:00
return kubeClient . Nodes ( ) . List ( labels . Everything ( ) , fieldSelector )
2015-03-20 16:52:32 +00:00
} ,
WatchFunc : func ( resourceVersion string ) ( watch . Interface , error ) {
2015-04-02 08:57:28 +00:00
return kubeClient . Nodes ( ) . Watch ( labels . Everything ( ) , fieldSelector , resourceVersion )
2015-03-20 16:52:32 +00:00
} ,
}
2015-03-24 20:13:25 +00:00
cache . NewReflector ( listWatch , & api . Node { } , nodeStore , 0 ) . Run ( )
2015-03-20 16:52:32 +00:00
}
2015-08-08 01:52:23 +00:00
nodeLister := & cache . StoreToNodeLister { Store : nodeStore }
2015-03-20 16:52:32 +00:00
2015-03-27 20:12:48 +00:00
// TODO: get the real minion object of ourself,
// and use the real minion name and UID.
// TODO: what is namespace for node?
nodeRef := & api . ObjectReference {
Kind : "Node" ,
2015-06-12 15:40:34 +00:00
Name : nodeName ,
UID : types . UID ( nodeName ) ,
2015-03-27 20:12:48 +00:00
Namespace : "" ,
}
2015-03-14 17:13:20 +00:00
containerGC , err := newContainerGC ( dockerClient , containerGCPolicy )
if err != nil {
return nil , err
}
2015-03-27 20:12:48 +00:00
imageManager , err := newImageManager ( dockerClient , cadvisorInterface , recorder , nodeRef , imageGCPolicy )
2015-03-16 04:00:46 +00:00
if err != nil {
return nil , fmt . Errorf ( "failed to initialize image manager: %v" , err )
}
2015-05-12 08:24:08 +00:00
diskSpaceManager , err := newDiskSpaceManager ( cadvisorInterface , diskSpacePolicy )
if err != nil {
return nil , fmt . Errorf ( "failed to initialize disk manager: %v" , err )
}
2015-03-20 16:37:08 +00:00
statusManager := newStatusManager ( kubeClient )
2015-04-23 21:16:59 +00:00
readinessManager := kubecontainer . NewReadinessManager ( )
containerRefManager := kubecontainer . NewRefManager ( )
2015-04-16 00:40:07 +00:00
volumeManager := newVolumeManager ( )
2015-03-14 17:13:20 +00:00
2015-04-11 00:29:56 +00:00
oomWatcher := NewOOMWatcher ( cadvisorInterface , recorder )
2015-01-12 00:42:11 +00:00
klet := & Kubelet {
2015-04-17 22:54:28 +00:00
hostname : hostname ,
2015-06-12 15:40:34 +00:00
nodeName : nodeName ,
2015-04-17 22:54:28 +00:00
dockerClient : dockerClient ,
kubeClient : kubeClient ,
rootDirectory : rootDirectory ,
resyncInterval : resyncInterval ,
2015-04-23 21:16:59 +00:00
containerRefManager : containerRefManager ,
readinessManager : readinessManager ,
2015-04-17 22:54:28 +00:00
httpClient : & http . Client { } ,
sourcesReady : sourcesReady ,
2015-05-20 21:21:03 +00:00
registerNode : registerNode ,
2015-06-12 17:20:26 +00:00
standaloneMode : standaloneMode ,
2015-04-17 22:54:28 +00:00
clusterDomain : clusterDomain ,
clusterDNS : clusterDNS ,
serviceLister : serviceLister ,
nodeLister : nodeLister ,
2015-05-05 10:19:54 +00:00
runtimeMutex : sync . Mutex { } ,
runtimeUpThreshold : maxWaitForContainerRuntime ,
lastTimestampRuntimeUp : time . Time { } ,
2015-04-17 22:54:28 +00:00
masterServiceNamespace : masterServiceNamespace ,
2015-01-08 20:41:38 +00:00
streamingConnectionIdleTimeout : streamingConnectionIdleTimeout ,
2015-03-03 06:06:20 +00:00
recorder : recorder ,
2015-03-06 07:56:30 +00:00
cadvisor : cadvisorInterface ,
2015-03-14 17:13:20 +00:00
containerGC : containerGC ,
2015-03-16 04:00:46 +00:00
imageManager : imageManager ,
2015-05-12 08:24:08 +00:00
diskSpaceManager : diskSpaceManager ,
2015-03-20 16:37:08 +00:00
statusManager : statusManager ,
2015-04-16 00:40:07 +00:00
volumeManager : volumeManager ,
2015-03-23 22:31:13 +00:00
cloud : cloud ,
2015-03-27 20:12:48 +00:00
nodeRef : nodeRef ,
2015-03-31 11:17:12 +00:00
nodeStatusUpdateFrequency : nodeStatusUpdateFrequency ,
2015-04-14 00:30:57 +00:00
resourceContainer : resourceContainer ,
2015-04-21 00:26:40 +00:00
os : osInterface ,
2015-04-11 00:29:56 +00:00
oomWatcher : oomWatcher ,
2015-04-24 00:07:52 +00:00
cgroupRoot : cgroupRoot ,
2015-05-04 14:43:10 +00:00
mounter : mounter ,
2015-05-11 21:07:24 +00:00
configureCBR0 : configureCBR0 ,
2015-06-24 18:10:10 +00:00
podCIDR : podCIDR ,
2015-05-18 22:32:32 +00:00
pods : pods ,
2015-06-17 22:31:46 +00:00
syncLoopMonitor : util . AtomicValue { } ,
2015-01-12 00:42:11 +00:00
}
2015-04-28 18:02:29 +00:00
if plug , err := network . InitNetworkPlugin ( networkPlugins , networkPluginName , & networkHost { klet } ) ; err != nil {
return nil , err
} else {
klet . networkPlugin = plug
}
2015-05-01 21:24:07 +00:00
2015-08-04 00:28:33 +00:00
machineInfo , err := klet . GetCachedMachineInfo ( )
if err != nil {
return nil , err
}
oomAdjuster := oom . NewOomAdjuster ( )
procFs := procfs . NewProcFs ( )
2015-05-01 21:24:07 +00:00
// Initialize the runtime.
switch containerRuntime {
case "docker" :
// Only supported one for now, continue.
2015-05-01 22:25:11 +00:00
klet . containerRuntime = dockertools . NewDockerManager (
dockerClient ,
recorder ,
readinessManager ,
containerRefManager ,
2015-08-04 00:28:33 +00:00
machineInfo ,
2015-05-01 22:25:11 +00:00
podInfraContainerImage ,
pullQPS ,
pullBurst ,
containerLogsDir ,
osInterface ,
klet . networkPlugin ,
klet ,
klet . httpClient ,
2015-08-04 00:28:33 +00:00
dockerExecHandler ,
oomAdjuster ,
procFs )
2015-05-08 06:26:07 +00:00
case "rkt" :
2015-08-17 17:03:45 +00:00
conf := & rkt . Config {
Path : rktPath ,
InsecureSkipVerify : true ,
}
2015-05-08 06:26:07 +00:00
rktRuntime , err := rkt . New (
conf ,
klet ,
recorder ,
containerRefManager ,
readinessManager ,
klet . volumeManager )
if err != nil {
return nil , err
}
klet . containerRuntime = rktRuntime
2015-05-12 16:59:02 +00:00
// No Docker daemon to put in a container.
dockerDaemonContainer = ""
2015-05-01 21:24:07 +00:00
default :
return nil , fmt . Errorf ( "unsupported container runtime %q specified" , containerRuntime )
}
2015-04-28 18:02:29 +00:00
2015-05-19 22:52:12 +00:00
// Setup container manager, can fail if the devices hierarchy is not mounted
// (it is required by Docker however).
2015-06-17 18:36:27 +00:00
containerManager , err := newContainerManager ( cadvisorInterface , dockerDaemonContainer , systemContainer , resourceContainer )
2015-05-12 16:59:02 +00:00
if err != nil {
return nil , fmt . Errorf ( "failed to create the Container Manager: %v" , err )
}
klet . containerManager = containerManager
2015-06-24 18:10:10 +00:00
go util . Until ( klet . syncNetworkStatus , 30 * time . Second , util . NeverStop )
2015-07-01 01:49:18 +00:00
if klet . kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go util . Until ( klet . syncNodeStatus , klet . nodeStatusUpdateFrequency , util . NeverStop )
}
2015-06-24 18:10:10 +00:00
2015-05-04 20:14:55 +00:00
// Wait for the runtime to be up with a timeout.
if err := waitUntilRuntimeIsUp ( klet . containerRuntime , maxWaitForContainerRuntime ) ; err != nil {
return nil , fmt . Errorf ( "timed out waiting for %q to come up: %v" , containerRuntime , err )
}
2015-05-05 10:19:54 +00:00
klet . lastTimestampRuntimeUp = time . Now ( )
2015-05-04 20:14:55 +00:00
2015-05-01 22:25:11 +00:00
klet . runner = klet . containerRuntime
2015-03-21 00:22:02 +00:00
klet . podManager = newBasicPodManager ( klet . kubeClient )
2015-05-01 22:25:11 +00:00
runtimeCache , err := kubecontainer . NewRuntimeCache ( klet . containerRuntime )
2015-02-19 09:12:53 +00:00
if err != nil {
return nil , err
}
2015-04-14 01:04:11 +00:00
klet . runtimeCache = runtimeCache
klet . podWorkers = newPodWorkers ( runtimeCache , klet . syncPod , recorder )
2015-02-19 09:12:53 +00:00
2015-04-14 01:04:11 +00:00
metrics . Register ( runtimeCache )
2015-02-24 18:08:32 +00:00
2015-02-19 09:12:53 +00:00
if err = klet . setupDataDirs ( ) ; err != nil {
2015-01-12 00:42:11 +00:00
return nil , err
}
2015-02-19 09:12:53 +00:00
if err = klet . volumePluginMgr . InitPlugins ( volumePlugins , & volumeHost { klet } ) ; err != nil {
2014-11-23 15:47:25 +00:00
return nil , err
}
2015-01-12 00:42:11 +00:00
2015-04-27 20:03:55 +00:00
// If the container logs directory does not exist, create it.
if _ , err := os . Stat ( containerLogsDir ) ; err != nil {
if err := osInterface . Mkdir ( containerLogsDir , 0755 ) ; err != nil {
glog . Errorf ( "Failed to create directory %q: %v" , containerLogsDir , err )
2015-04-21 00:26:40 +00:00
}
}
2015-03-19 23:14:13 +00:00
2015-08-13 12:59:15 +00:00
klet . backOff = util . NewBackOff ( resyncInterval , maxContainerBackOff )
2015-01-12 00:42:11 +00:00
return klet , nil
2014-07-22 21:40:59 +00:00
}
2015-01-08 15:25:14 +00:00
type serviceLister interface {
List ( ) ( api . ServiceList , error )
}
2015-03-20 16:52:32 +00:00
type nodeLister interface {
List ( ) ( machines api . NodeList , err error )
GetNodeInfo ( id string ) ( * api . Node , error )
}
2014-07-10 12:26:24 +00:00
// Kubelet is the main kubelet implementation.
2014-06-06 23:40:48 +00:00
type Kubelet struct {
2015-04-09 01:56:58 +00:00
hostname string
2015-06-12 15:40:34 +00:00
nodeName string
2015-04-09 01:56:58 +00:00
dockerClient dockertools . DockerInterface
2015-04-14 01:04:11 +00:00
runtimeCache kubecontainer . RuntimeCache
2015-04-09 01:56:58 +00:00
kubeClient client . Interface
rootDirectory string
2015-05-08 18:48:26 +00:00
podWorkers PodWorkers
2015-04-09 01:56:58 +00:00
resyncInterval time . Duration
sourcesReady SourcesReadyFn
2014-07-15 20:24:41 +00:00
2015-03-21 00:22:02 +00:00
podManager podManager
2015-03-18 18:43:59 +00:00
2014-11-04 00:16:31 +00:00
// Needed to report events for containers belonging to deleted/modified pods.
// Tracks references for reporting events
2015-03-26 18:44:52 +00:00
containerRefManager * kubecontainer . RefManager
2014-11-04 00:16:31 +00:00
2014-07-15 20:24:41 +00:00
// Optional, defaults to /logs/ from /var/log
2014-07-22 21:40:59 +00:00
logServer http . Handler
2014-08-07 18:15:11 +00:00
// Optional, defaults to simple Docker implementation
2015-05-11 22:32:51 +00:00
runner kubecontainer . ContainerCommandRunner
2014-09-03 20:39:56 +00:00
// Optional, client for http requests, defaults to empty client
2015-04-30 00:50:59 +00:00
httpClient kubeletTypes . HttpGetter
2014-10-09 00:05:04 +00:00
2015-03-06 07:56:30 +00:00
// cAdvisor used for container information.
cadvisor cadvisor . Interface
2014-10-28 00:29:55 +00:00
2015-05-20 21:21:03 +00:00
// Set to true to have the node register itself with the apiserver.
registerNode bool
2015-07-01 01:49:18 +00:00
// for internal book keeping; access only from within registerWithApiserver
registrationCompleted bool
2015-05-20 21:21:03 +00:00
2015-06-12 17:20:26 +00:00
// Set to true if the kubelet is in standalone mode (i.e. setup without an apiserver)
standaloneMode bool
2014-11-12 05:21:40 +00:00
// If non-empty, use this for container DNS search.
clusterDomain string
// If non-nil, use this for container DNS server.
clusterDNS net . IP
2015-01-08 15:25:14 +00:00
masterServiceNamespace string
serviceLister serviceLister
2015-03-20 16:52:32 +00:00
nodeLister nodeLister
2014-11-23 15:47:25 +00:00
2015-07-03 20:29:14 +00:00
// Last timestamp when runtime responded on ping.
2015-05-05 10:19:54 +00:00
// Mutex is used to protect this value.
runtimeMutex sync . Mutex
runtimeUpThreshold time . Duration
lastTimestampRuntimeUp time . Time
2015-06-24 18:10:10 +00:00
// Network Status information
networkConfigMutex sync . Mutex
networkConfigured bool
2014-11-23 15:47:25 +00:00
// Volume plugins.
2015-03-19 05:18:31 +00:00
volumePluginMgr volume . VolumePluginMgr
2015-02-02 18:51:52 +00:00
2015-04-17 22:54:28 +00:00
// Network plugin.
2015-03-19 23:14:13 +00:00
networkPlugin network . NetworkPlugin
2015-03-27 01:45:23 +00:00
// Container readiness state manager.
readinessManager * kubecontainer . ReadinessManager
2015-01-08 20:41:38 +00:00
2015-03-20 16:37:08 +00:00
// How long to keep idle streaming command execution/port forwarding
2015-01-08 20:41:38 +00:00
// connections open before terminating them
streamingConnectionIdleTimeout time . Duration
2015-03-03 06:06:20 +00:00
2015-03-20 16:37:08 +00:00
// The EventRecorder to use
2015-03-03 06:06:20 +00:00
recorder record . EventRecorder
2015-03-03 18:33:25 +00:00
2015-03-14 17:13:20 +00:00
// Policy for handling garbage collection of dead containers.
containerGC containerGC
2015-03-16 04:00:46 +00:00
// Manager for images.
imageManager imageManager
2015-03-16 12:50:00 +00:00
2015-05-12 08:24:08 +00:00
// Diskspace manager.
diskSpaceManager diskSpaceManager
2015-03-16 12:50:00 +00:00
// Cached MachineInfo returned by cadvisor.
machineInfo * cadvisorApi . MachineInfo
2015-03-20 16:37:08 +00:00
// Syncs pods statuses with apiserver; also used as a cache of statuses.
statusManager * statusManager
2015-03-23 22:31:13 +00:00
2015-04-16 00:40:07 +00:00
// Manager for the volume maps for the pods.
volumeManager * volumeManager
2015-03-23 22:31:13 +00:00
//Cloud provider interface
cloud cloudprovider . Interface
2015-03-27 20:12:48 +00:00
// Reference to this node.
nodeRef * api . ObjectReference
2015-04-02 20:14:52 +00:00
2015-05-01 22:25:11 +00:00
// Container runtime.
containerRuntime kubecontainer . Runtime
2015-03-31 11:17:12 +00:00
// nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
// Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// in nodecontroller. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
// The constant must be less than podEvictionTimeout.
// 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
2015-07-03 20:29:14 +00:00
// status. Kubelet may fail to update node status reliably if the value is too small,
2015-03-31 11:17:12 +00:00
// as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency time . Duration
2015-05-04 14:43:10 +00:00
2015-04-14 00:30:57 +00:00
// The name of the resource-only container to run the Kubelet in (empty for no container).
// Name must be absolute.
resourceContainer string
2015-04-21 00:26:40 +00:00
2015-04-29 20:44:29 +00:00
os kubecontainer . OSInterface
// Watcher of out of memory events.
2015-04-11 00:29:56 +00:00
oomWatcher OOMWatcher
2015-04-29 20:44:29 +00:00
2015-04-24 00:07:52 +00:00
// If non-empty, pass this to the container runtime as the root cgroup.
cgroupRoot string
2015-05-04 14:43:10 +00:00
// Mounter to use for volumes.
mounter mount . Interface
2015-05-12 16:59:02 +00:00
// Manager of non-Runtime containers.
containerManager containerManager
2015-05-11 21:07:24 +00:00
// Whether or not kubelet should take responsibility for keeping cbr0 in
// the correct state.
configureCBR0 bool
2015-06-24 18:10:10 +00:00
podCIDR string
2015-03-17 14:43:49 +00:00
// Number of Pods which can be run by this Kubelet
2015-05-18 22:32:32 +00:00
pods int
2015-06-17 22:31:46 +00:00
// Monitor Kubelet's sync loop
syncLoopMonitor util . AtomicValue
2015-08-13 12:59:15 +00:00
// Container restart Backoff
backOff * util . Backoff
2014-10-28 00:29:55 +00:00
}
2014-11-23 15:47:25 +00:00
// getRootDir returns the full path to the directory under which kubelet can
2014-11-29 19:02:28 +00:00
// store data. These functions are useful to pass interfaces to other modules
// that may need to know where to write data without getting a whole kubelet
// instance.
2014-11-23 15:47:25 +00:00
func ( kl * Kubelet ) getRootDir ( ) string {
2014-11-29 19:02:28 +00:00
return kl . rootDirectory
}
2014-11-23 15:47:25 +00:00
// getPodsDir returns the full path to the directory under which pod
2014-11-29 19:02:28 +00:00
// directories are created.
2014-11-23 15:47:25 +00:00
func ( kl * Kubelet ) getPodsDir ( ) string {
return path . Join ( kl . getRootDir ( ) , "pods" )
}
// getPluginsDir returns the full path to the directory under which plugin
// directories are created. Plugins can use these directories for data that
// they need to persist. Plugins should create subdirectories under this named
// after their own names.
func ( kl * Kubelet ) getPluginsDir ( ) string {
return path . Join ( kl . getRootDir ( ) , "plugins" )
}
// getPluginDir returns a data directory name for a given plugin name.
// Plugins can use these directories to store data that they need to persist.
// For per-pod plugin data, see getPodPluginDir.
func ( kl * Kubelet ) getPluginDir ( pluginName string ) string {
return path . Join ( kl . getPluginsDir ( ) , pluginName )
2014-11-29 19:02:28 +00:00
}
2014-11-23 15:47:25 +00:00
// getPodDir returns the full path to the per-pod data directory for the
2014-11-29 19:02:28 +00:00
// specified pod. This directory may not exist if the pod does not exist.
2014-11-23 15:47:25 +00:00
func ( kl * Kubelet ) getPodDir ( podUID types . UID ) string {
2014-11-29 19:02:28 +00:00
// Backwards compat. The "old" stuff should be removed before 1.0
// release. The thinking here is this:
// !old && !new = use new
// !old && new = use new
// old && !new = use old
// old && new = use new (but warn)
2014-11-23 15:47:25 +00:00
oldPath := path . Join ( kl . getRootDir ( ) , string ( podUID ) )
2014-11-29 19:02:28 +00:00
oldExists := dirExists ( oldPath )
2014-11-23 15:47:25 +00:00
newPath := path . Join ( kl . getPodsDir ( ) , string ( podUID ) )
2014-11-29 19:02:28 +00:00
newExists := dirExists ( newPath )
if oldExists && ! newExists {
return oldPath
}
if oldExists {
glog . Warningf ( "Data dir for pod %q exists in both old and new form, using new" , podUID )
}
return newPath
2014-11-29 19:02:28 +00:00
}
2014-11-23 15:47:25 +00:00
// getPodVolumesDir returns the full path to the per-pod data directory under
2014-11-29 19:02:28 +00:00
// which volumes are created for the specified pod. This directory may not
// exist if the pod does not exist.
2014-11-23 15:47:25 +00:00
func ( kl * Kubelet ) getPodVolumesDir ( podUID types . UID ) string {
return path . Join ( kl . getPodDir ( podUID ) , "volumes" )
2014-11-29 19:02:28 +00:00
}
2014-11-23 15:47:25 +00:00
// getPodVolumeDir returns the full path to the directory which represents the
// named volume under the named plugin for specified pod. This directory may not
// exist if the pod does not exist.
func ( kl * Kubelet ) getPodVolumeDir ( podUID types . UID , pluginName string , volumeName string ) string {
return path . Join ( kl . getPodVolumesDir ( podUID ) , pluginName , volumeName )
}
// getPodPluginsDir returns the full path to the per-pod data directory under
// which plugins may store data for the specified pod. This directory may not
// exist if the pod does not exist.
func ( kl * Kubelet ) getPodPluginsDir ( podUID types . UID ) string {
return path . Join ( kl . getPodDir ( podUID ) , "plugins" )
}
// getPodPluginDir returns a data directory name for a given plugin name for a
// given pod UID. Plugins can use these directories to store data that they
// need to persist. For non-per-pod plugin data, see getPluginDir.
func ( kl * Kubelet ) getPodPluginDir ( podUID types . UID , pluginName string ) string {
return path . Join ( kl . getPodPluginsDir ( podUID ) , pluginName )
}
// getPodContainerDir returns the full path to the per-pod data directory under
2014-11-29 19:02:28 +00:00
// which container data is held for the specified pod. This directory may not
// exist if the pod or container does not exist.
2014-11-23 15:47:25 +00:00
func ( kl * Kubelet ) getPodContainerDir ( podUID types . UID , ctrName string ) string {
2014-11-29 19:02:28 +00:00
// Backwards compat. The "old" stuff should be removed before 1.0
// release. The thinking here is this:
// !old && !new = use new
// !old && new = use new
// old && !new = use old
// old && new = use new (but warn)
2014-11-23 15:47:25 +00:00
oldPath := path . Join ( kl . getPodDir ( podUID ) , ctrName )
2014-11-29 19:02:28 +00:00
oldExists := dirExists ( oldPath )
2014-11-23 15:47:25 +00:00
newPath := path . Join ( kl . getPodDir ( podUID ) , "containers" , ctrName )
2014-11-29 19:02:28 +00:00
newExists := dirExists ( newPath )
if oldExists && ! newExists {
return oldPath
}
if oldExists {
glog . Warningf ( "Data dir for pod %q, container %q exists in both old and new form, using new" , podUID , ctrName )
}
return newPath
}
func dirExists ( path string ) bool {
s , err := os . Stat ( path )
if err != nil {
return false
}
return s . IsDir ( )
2014-11-29 19:02:28 +00:00
}
2015-01-12 00:42:11 +00:00
func ( kl * Kubelet ) setupDataDirs ( ) error {
kl . rootDirectory = path . Clean ( kl . rootDirectory )
2014-11-23 15:47:25 +00:00
if err := os . MkdirAll ( kl . getRootDir ( ) , 0750 ) ; err != nil {
2015-01-12 00:42:11 +00:00
return fmt . Errorf ( "error creating root directory: %v" , err )
}
2014-11-23 15:47:25 +00:00
if err := os . MkdirAll ( kl . getPodsDir ( ) , 0750 ) ; err != nil {
2015-01-12 00:42:11 +00:00
return fmt . Errorf ( "error creating pods directory: %v" , err )
}
2014-11-23 15:47:25 +00:00
if err := os . MkdirAll ( kl . getPluginsDir ( ) , 0750 ) ; err != nil {
return fmt . Errorf ( "error creating plugins directory: %v" , err )
}
2015-01-12 00:42:11 +00:00
return nil
}
// Get a list of pods that have data directories.
2015-01-14 23:22:21 +00:00
func ( kl * Kubelet ) listPodsFromDisk ( ) ( [ ] types . UID , error ) {
2014-11-23 15:47:25 +00:00
podInfos , err := ioutil . ReadDir ( kl . getPodsDir ( ) )
2015-01-12 00:42:11 +00:00
if err != nil {
return nil , err
}
2015-01-14 23:22:21 +00:00
pods := [ ] types . UID { }
2015-01-12 00:42:11 +00:00
for i := range podInfos {
if podInfos [ i ] . IsDir ( ) {
2015-01-14 23:22:21 +00:00
pods = append ( pods , types . UID ( podInfos [ i ] . Name ( ) ) )
2015-01-12 00:42:11 +00:00
}
}
return pods , nil
}
2015-03-20 16:52:32 +00:00
func ( kl * Kubelet ) GetNode ( ) ( * api . Node , error ) {
2015-06-13 00:28:34 +00:00
if kl . standaloneMode {
return nil , errors . New ( "no node entry for kubelet in standalone mode" )
}
2015-03-20 16:52:32 +00:00
l , err := kl . nodeLister . List ( )
if err != nil {
return nil , errors . New ( "cannot list nodes" )
}
2015-06-12 15:40:34 +00:00
nodeName := kl . nodeName
2015-03-20 16:52:32 +00:00
for _ , n := range l . Items {
2015-06-12 15:40:34 +00:00
if n . Name == nodeName {
2015-03-20 16:52:32 +00:00
return & n , nil
}
}
2015-06-12 15:40:34 +00:00
return nil , fmt . Errorf ( "node %v not found" , nodeName )
2015-03-20 16:52:32 +00:00
}
2015-07-03 20:29:14 +00:00
// Starts garbage collection threads.
2015-03-16 04:00:46 +00:00
func ( kl * Kubelet ) StartGarbageCollection ( ) {
2015-08-24 01:59:15 +00:00
go util . Until ( func ( ) {
2015-03-14 17:13:20 +00:00
if err := kl . containerGC . GarbageCollect ( ) ; err != nil {
2015-03-16 04:00:46 +00:00
glog . Errorf ( "Container garbage collection failed: %v" , err )
}
2015-08-24 01:59:15 +00:00
} , time . Minute , util . NeverStop )
2015-03-16 04:00:46 +00:00
2015-08-24 01:59:15 +00:00
go util . Until ( func ( ) {
2015-03-16 04:00:46 +00:00
if err := kl . imageManager . GarbageCollect ( ) ; err != nil {
glog . Errorf ( "Image garbage collection failed: %v" , err )
2014-12-22 19:54:07 +00:00
}
2015-08-24 01:59:15 +00:00
} , 5 * time . Minute , util . NeverStop )
2014-12-22 19:54:07 +00:00
}
2014-07-15 20:24:41 +00:00
// Run starts the kubelet reacting to config updates
func ( kl * Kubelet ) Run ( updates <- chan PodUpdate ) {
2014-07-22 21:40:59 +00:00
if kl . logServer == nil {
kl . logServer = http . StripPrefix ( "/logs/" , http . FileServer ( http . Dir ( "/var/log/" ) ) )
2014-07-15 07:04:30 +00:00
}
2015-02-23 21:04:45 +00:00
if kl . kubeClient == nil {
glog . Warning ( "No api server defined - no node status update will be sent." )
}
2015-04-14 00:30:57 +00:00
// Move Kubelet to a container.
if kl . resourceContainer != "" {
err := util . RunInResourceContainer ( kl . resourceContainer )
if err != nil {
glog . Warningf ( "Failed to move Kubelet to container %q: %v" , kl . resourceContainer , err )
}
glog . Infof ( "Running in container %q" , kl . resourceContainer )
}
2015-05-20 21:21:03 +00:00
if err := kl . imageManager . Start ( ) ; err != nil {
2015-08-11 07:25:10 +00:00
kl . recorder . Eventf ( kl . nodeRef , "KubeletSetupFailed" , "Failed to start ImageManager %v" , err )
2015-05-05 18:15:12 +00:00
glog . Errorf ( "Failed to start ImageManager, images may not be garbage collected: %v" , err )
}
2015-05-20 21:21:03 +00:00
if err := kl . cadvisor . Start ( ) ; err != nil {
2015-08-11 07:25:10 +00:00
kl . recorder . Eventf ( kl . nodeRef , "KubeletSetupFailed" , "Failed to start CAdvisor %v" , err )
2015-05-15 20:24:24 +00:00
glog . Errorf ( "Failed to start CAdvisor, system may not be properly monitored: %v" , err )
}
2015-05-20 21:21:03 +00:00
if err := kl . containerManager . Start ( ) ; err != nil {
2015-08-11 07:25:10 +00:00
kl . recorder . Eventf ( kl . nodeRef , "KubeletSetupFailed" , "Failed to start ContainerManager %v" , err )
2015-05-12 16:59:02 +00:00
glog . Errorf ( "Failed to start ContainerManager, system may not be properly isolated: %v" , err )
}
2015-05-20 21:21:03 +00:00
if err := kl . oomWatcher . Start ( kl . nodeRef ) ; err != nil {
2015-08-11 07:25:10 +00:00
kl . recorder . Eventf ( kl . nodeRef , "KubeletSetupFailed" , "Failed to start OOM watcher %v" , err )
2015-05-18 19:18:12 +00:00
glog . Errorf ( "Failed to start OOM watching: %v" , err )
}
2015-05-05 10:19:54 +00:00
go util . Until ( kl . updateRuntimeUp , 5 * time . Second , util . NeverStop )
2015-06-24 18:10:10 +00:00
2015-04-11 00:29:56 +00:00
// Run the system oom watcher forever.
2015-03-20 16:37:08 +00:00
kl . statusManager . Start ( )
2014-07-15 20:24:41 +00:00
kl . syncLoop ( updates , kl )
2014-06-06 23:40:48 +00:00
}
2015-05-20 21:21:03 +00:00
func ( kl * Kubelet ) initialNodeStatus ( ) ( * api . Node , error ) {
node := & api . Node {
ObjectMeta : api . ObjectMeta {
2015-06-12 15:40:34 +00:00
Name : kl . nodeName ,
2015-05-20 21:21:03 +00:00
Labels : map [ string ] string { "kubernetes.io/hostname" : kl . hostname } ,
} ,
}
if kl . cloud != nil {
instances , ok := kl . cloud . Instances ( )
if ! ok {
return nil , fmt . Errorf ( "failed to get instances from cloud provider" )
}
2015-06-12 15:40:34 +00:00
2015-05-20 21:21:03 +00:00
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
2015-05-05 14:52:20 +00:00
// TODO: ExternalID is deprecated, we'll have to drop this code
2015-06-12 15:40:34 +00:00
externalID , err := instances . ExternalID ( kl . nodeName )
2015-05-20 21:21:03 +00:00
if err != nil {
2015-05-05 14:52:20 +00:00
return nil , fmt . Errorf ( "failed to get external ID from cloud provider: %v" , err )
}
node . Spec . ExternalID = externalID
2015-06-12 15:40:34 +00:00
2015-05-05 14:52:20 +00:00
// TODO: We can't assume that the node has credentials to talk to the
// cloudprovider from arbitrary nodes. At most, we should talk to a
// local metadata server here.
2015-06-12 15:40:34 +00:00
node . Spec . ProviderID , err = cloudprovider . GetInstanceProviderID ( kl . cloud , kl . nodeName )
2015-05-05 14:52:20 +00:00
if err != nil {
return nil , err
2015-05-20 21:21:03 +00:00
}
} else {
node . Spec . ExternalID = kl . hostname
}
if err := kl . setNodeStatus ( node ) ; err != nil {
return nil , err
}
return node , nil
}
2015-07-01 01:49:18 +00:00
// registerWithApiserver registers the node with the cluster master. It is safe
// to call multiple times, but not concurrently (kl.registrationCompleted is
// not locked).
2015-05-20 21:21:03 +00:00
func ( kl * Kubelet ) registerWithApiserver ( ) {
2015-07-01 01:49:18 +00:00
if kl . registrationCompleted {
return
}
2015-05-20 21:21:03 +00:00
step := 100 * time . Millisecond
for {
time . Sleep ( step )
step = step * 2
if step >= 7 * time . Second {
step = 7 * time . Second
}
node , err := kl . initialNodeStatus ( )
if err != nil {
glog . Errorf ( "Unable to construct api.Node object for kubelet: %v" , err )
continue
}
glog . V ( 2 ) . Infof ( "Attempting to register node %s" , node . Name )
if _ , err := kl . kubeClient . Nodes ( ) . Create ( node ) ; err != nil {
2015-07-01 01:49:18 +00:00
if ! apierrors . IsAlreadyExists ( err ) {
glog . V ( 2 ) . Infof ( "Unable to register %s with the apiserver: %v" , node . Name , err )
continue
}
currentNode , err := kl . kubeClient . Nodes ( ) . Get ( kl . nodeName )
if err != nil {
glog . Errorf ( "error getting node %q: %v" , kl . nodeName , err )
continue
}
if currentNode == nil {
glog . Errorf ( "no node instance returned for %q" , kl . nodeName )
continue
}
if currentNode . Spec . ExternalID == node . Spec . ExternalID {
glog . Infof ( "Node %s was previously registered" , node . Name )
kl . registrationCompleted = true
return
}
glog . Errorf (
"Previously %q had externalID %q; now it is %q; will delete and recreate." ,
kl . nodeName , node . Spec . ExternalID , currentNode . Spec . ExternalID ,
)
if err := kl . kubeClient . Nodes ( ) . Delete ( node . Name ) ; err != nil {
glog . Errorf ( "Unable to delete old node: %v" , err )
} else {
glog . Errorf ( "Deleted old node object %q" , kl . nodeName )
2015-05-20 21:21:03 +00:00
}
continue
}
glog . Infof ( "Successfully registered node %s" , node . Name )
2015-07-01 01:49:18 +00:00
kl . registrationCompleted = true
2015-05-20 21:21:03 +00:00
return
}
}
2015-07-01 01:49:18 +00:00
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master, registering the kubelet first if
// necessary.
2015-02-23 21:04:45 +00:00
func ( kl * Kubelet ) syncNodeStatus ( ) {
if kl . kubeClient == nil {
return
}
2015-05-20 21:21:03 +00:00
if kl . registerNode {
2015-07-01 01:49:18 +00:00
// This will exit immediately if it doesn't need to do anything.
2015-05-20 21:21:03 +00:00
kl . registerWithApiserver ( )
2015-05-20 20:47:51 +00:00
}
2015-07-01 01:49:18 +00:00
if err := kl . updateNodeStatus ( ) ; err != nil {
glog . Errorf ( "Unable to update node status: %v" , err )
2015-02-23 21:04:45 +00:00
}
}
2015-05-12 21:49:35 +00:00
func makeMounts ( container * api . Container , podVolumes kubecontainer . VolumeMap ) ( mounts [ ] kubecontainer . Mount ) {
2014-08-27 05:08:06 +00:00
for _ , mount := range container . VolumeMounts {
vol , ok := podVolumes [ mount . Name ]
if ! ok {
2015-04-16 00:40:07 +00:00
glog . Warningf ( "Mount cannot be satisified for container %q, because the volume is missing: %q" , container . Name , mount )
2014-08-27 05:08:06 +00:00
continue
2014-06-19 23:59:48 +00:00
}
2015-05-12 21:49:35 +00:00
mounts = append ( mounts , kubecontainer . Mount {
Name : mount . Name ,
ContainerPath : mount . MountPath ,
HostPath : vol . GetPath ( ) ,
ReadOnly : mount . ReadOnly ,
} )
}
return
}
func makePortMappings ( container * api . Container ) ( ports [ ] kubecontainer . PortMapping ) {
names := make ( map [ string ] struct { } )
for _ , p := range container . Ports {
pm := kubecontainer . PortMapping {
HostPort : p . HostPort ,
ContainerPort : p . ContainerPort ,
Protocol : p . Protocol ,
HostIP : p . HostIP ,
2014-06-06 23:40:48 +00:00
}
2015-05-12 21:49:35 +00:00
// We need to create some default port name if it's not specified, since
// this is necessary for rkt.
2015-08-06 01:08:26 +00:00
// http://issue.k8s.io/7710
2015-05-12 21:49:35 +00:00
if p . Name == "" {
pm . Name = fmt . Sprintf ( "%s-%s:%d" , container . Name , p . Protocol , p . ContainerPort )
} else {
pm . Name = fmt . Sprintf ( "%s-%s" , container . Name , p . Name )
}
// Protect against exposing the same protocol-port more than once in a container.
if _ , ok := names [ pm . Name ] ; ok {
glog . Warningf ( "Port name conflicted, %q is defined more than once" , pm . Name )
continue
}
ports = append ( ports , pm )
names [ pm . Name ] = struct { } { }
2014-06-06 23:40:48 +00:00
}
2015-04-16 00:40:07 +00:00
return
2014-06-09 20:47:25 +00:00
}
2015-02-23 21:04:45 +00:00
2015-04-23 20:55:50 +00:00
// GenerateRunContainerOptions generates the RunContainerOptions, which can be used by
2015-03-26 18:59:41 +00:00
// the container runtime to set parameters for launching a container.
2015-05-12 21:18:00 +00:00
func ( kl * Kubelet ) GenerateRunContainerOptions ( pod * api . Pod , container * api . Container ) ( * kubecontainer . RunContainerOptions , error ) {
2015-03-26 18:59:41 +00:00
var err error
2015-05-12 21:18:00 +00:00
opts := & kubecontainer . RunContainerOptions { CgroupParent : kl . cgroupRoot }
2014-11-14 19:34:41 +00:00
2015-04-16 00:40:07 +00:00
vol , ok := kl . volumeManager . GetVolumes ( pod . UID )
if ! ok {
return nil , fmt . Errorf ( "impossible: cannot find the mounted volumes for pod %q" , kubecontainer . GetPodFullName ( pod ) )
}
2015-05-12 21:49:35 +00:00
opts . PortMappings = makePortMappings ( container )
opts . Mounts = makeMounts ( container , vol )
2015-04-23 20:57:30 +00:00
opts . Envs , err = kl . makeEnvironmentVariables ( pod , container )
2014-06-06 23:40:48 +00:00
if err != nil {
2015-03-26 18:59:41 +00:00
return nil , err
2014-11-14 19:34:41 +00:00
}
2014-11-07 06:41:16 +00:00
if len ( container . TerminationMessagePath ) != 0 {
2014-11-23 15:47:25 +00:00
p := kl . getPodContainerDir ( pod . UID , container . Name )
2014-11-07 06:41:16 +00:00
if err := os . MkdirAll ( p , 0750 ) ; err != nil {
2015-01-06 00:38:47 +00:00
glog . Errorf ( "Error on creating %q: %v" , p , err )
2014-11-07 06:41:16 +00:00
} else {
2015-03-26 18:59:41 +00:00
opts . PodContainerDir = p
2014-11-07 06:41:16 +00:00
}
}
2014-11-12 05:21:40 +00:00
if pod . Spec . DNSPolicy == api . DNSClusterFirst {
2015-03-26 18:59:41 +00:00
opts . DNS , opts . DNSSearch , err = kl . getClusterDNS ( pod )
if err != nil {
return nil , err
2014-11-12 05:21:40 +00:00
}
}
2015-03-26 18:59:41 +00:00
return opts , nil
}
2015-06-02 18:46:57 +00:00
var masterServices = util . NewStringSet ( "kubernetes" )
2015-01-08 15:25:14 +00:00
// getServiceEnvVarMap makes a map[string]string of env vars for services a pod in namespace ns should see
func ( kl * Kubelet ) getServiceEnvVarMap ( ns string ) ( map [ string ] string , error ) {
var (
serviceMap = make ( map [ string ] api . Service )
m = make ( map [ string ] string )
)
// Get all service resources from the master (via a cache),
2015-07-03 20:29:14 +00:00
// and populate them into service environment variables.
2015-01-08 15:25:14 +00:00
if kl . serviceLister == nil {
// Kubelets without masters (e.g. plain GCE ContainerVM) don't set env vars.
return m , nil
}
services , err := kl . serviceLister . List ( )
if err != nil {
2015-02-16 17:33:20 +00:00
return m , fmt . Errorf ( "failed to list services when setting up env vars." )
2015-01-08 15:25:14 +00:00
}
// project the services in namespace ns onto the master services
for _ , service := range services . Items {
2015-05-23 20:41:11 +00:00
// ignore services where ClusterIP is "None" or empty
2015-03-16 21:36:30 +00:00
if ! api . IsServiceIPSet ( & service ) {
continue
}
2015-01-08 15:25:14 +00:00
serviceName := service . Name
switch service . Namespace {
// for the case whether the master service namespace is the namespace the pod
2015-01-28 17:00:53 +00:00
// is in, the pod should receive all the services in the namespace.
2015-01-08 15:25:14 +00:00
//
// ordering of the case clauses below enforces this
case ns :
serviceMap [ serviceName ] = service
case kl . masterServiceNamespace :
if masterServices . Has ( serviceName ) {
2015-06-02 18:46:57 +00:00
if _ , exists := serviceMap [ serviceName ] ; ! exists {
2015-01-08 15:25:14 +00:00
serviceMap [ serviceName ] = service
}
}
}
}
services . Items = [ ] api . Service { }
for _ , service := range serviceMap {
services . Items = append ( services . Items , service )
}
for _ , e := range envvars . FromServices ( & services ) {
m [ e . Name ] = e . Value
}
return m , nil
}
// Make the service environment variables for a pod in the given namespace.
2015-05-12 21:49:35 +00:00
func ( kl * Kubelet ) makeEnvironmentVariables ( pod * api . Pod , container * api . Container ) ( [ ] kubecontainer . EnvVar , error ) {
var result [ ] kubecontainer . EnvVar
2015-01-08 15:25:14 +00:00
// Note: These are added to the docker.Config, but are not included in the checksum computed
// by dockertools.BuildDockerName(...). That way, we can still determine whether an
// api.Container is already running by its hash. (We don't want to restart a container just
// because some service changed.)
//
// Note that there is a race between Kubelet seeing the pod and kubelet seeing the service.
// To avoid this users can: (1) wait between starting a service and starting; or (2) detect
// missing service env var and exit and be restarted; or (3) use DNS instead of env vars
// and keep trying to resolve the DNS name of the service (recommended).
2015-04-23 20:57:30 +00:00
serviceEnv , err := kl . getServiceEnvVarMap ( pod . Namespace )
2015-01-08 15:25:14 +00:00
if err != nil {
return result , err
}
2015-05-22 22:21:03 +00:00
// Determine the final values of variables:
//
// 1. Determine the final value of each variable:
// a. If the variable's Value is set, expand the `$(var)` references to other
// variables in the .Value field; the sources of variables are the declared
// variables of the container and the service environment variables
// b. If a source is defined for an environment variable, resolve the source
// 2. Create the container's environment in the order variables are declared
// 3. Add remaining service environment vars
tmpEnv := make ( map [ string ] string )
mappingFunc := expansion . MappingFuncFor ( tmpEnv , serviceEnv )
for _ , envVar := range container . Env {
2015-03-12 14:39:22 +00:00
// Accesses apiserver+Pods.
2015-01-08 15:25:14 +00:00
// So, the master may set service env vars, or kubelet may. In case both are doing
// it, we delete the key from the kubelet-generated ones so we don't have duplicate
// env vars.
// TODO: remove this net line once all platforms use apiserver+Pods.
2015-05-22 22:21:03 +00:00
delete ( serviceEnv , envVar . Name )
runtimeVal := envVar . Value
if runtimeVal != "" {
// Step 1a: expand variable references
runtimeVal = expansion . Expand ( runtimeVal , mappingFunc )
} else if envVar . ValueFrom != nil && envVar . ValueFrom . FieldRef != nil {
// Step 1b: resolve alternate env var sources
runtimeVal , err = kl . podFieldSelectorRuntimeValue ( envVar . ValueFrom . FieldRef , pod )
if err != nil {
return result , err
}
2015-04-23 20:57:30 +00:00
}
2015-05-22 22:21:03 +00:00
tmpEnv [ envVar . Name ] = runtimeVal
result = append ( result , kubecontainer . EnvVar { Name : envVar . Name , Value : tmpEnv [ envVar . Name ] } )
2015-01-08 15:25:14 +00:00
}
// Append remaining service env vars.
for k , v := range serviceEnv {
2015-05-12 21:49:35 +00:00
result = append ( result , kubecontainer . EnvVar { Name : k , Value : v } )
2015-01-08 15:25:14 +00:00
}
return result , nil
}
2015-04-23 20:57:30 +00:00
func ( kl * Kubelet ) podFieldSelectorRuntimeValue ( fs * api . ObjectFieldSelector , pod * api . Pod ) ( string , error ) {
internalFieldPath , _ , err := api . Scheme . ConvertFieldLabel ( fs . APIVersion , "Pod" , fs . FieldPath , "" )
if err != nil {
return "" , err
}
2015-08-12 18:14:49 +00:00
switch internalFieldPath {
case "status.podIP" :
return pod . Status . PodIP , nil
}
2015-04-23 20:57:30 +00:00
return fieldpath . ExtractFieldPathAsString ( pod , internalFieldPath )
}
2015-03-26 18:59:41 +00:00
// getClusterDNS returns a list of the DNS servers and a list of the DNS search
// domains of the cluster.
func ( kl * Kubelet ) getClusterDNS ( pod * api . Pod ) ( [ ] string , [ ] string , error ) {
2014-11-12 05:21:40 +00:00
// Get host DNS settings and append them to cluster DNS settings.
f , err := os . Open ( "/etc/resolv.conf" )
if err != nil {
2015-03-26 18:59:41 +00:00
return nil , nil , err
2014-11-12 05:21:40 +00:00
}
defer f . Close ( )
hostDNS , hostSearch , err := parseResolvConf ( f )
if err != nil {
2015-03-26 18:59:41 +00:00
return nil , nil , err
2014-11-12 05:21:40 +00:00
}
2015-03-26 18:59:41 +00:00
var dns , dnsSearch [ ] string
2014-11-12 05:21:40 +00:00
if kl . clusterDNS != nil {
2015-03-26 18:59:41 +00:00
dns = append ( [ ] string { kl . clusterDNS . String ( ) } , hostDNS ... )
2014-11-12 05:21:40 +00:00
}
if kl . clusterDomain != "" {
2015-06-01 19:21:39 +00:00
nsSvcDomain := fmt . Sprintf ( "%s.svc.%s" , pod . Namespace , kl . clusterDomain )
svcDomain := fmt . Sprintf ( "svc.%s" , kl . clusterDomain )
dnsSearch = append ( [ ] string { nsSvcDomain , svcDomain , kl . clusterDomain } , hostSearch ... )
2014-11-12 05:21:40 +00:00
}
2015-03-26 18:59:41 +00:00
return dns , dnsSearch , nil
2014-11-12 05:21:40 +00:00
}
// Returns the list of DNS servers and DNS search domains.
func parseResolvConf ( reader io . Reader ) ( nameservers [ ] string , searches [ ] string , err error ) {
file , err := ioutil . ReadAll ( reader )
if err != nil {
return nil , nil , err
}
// Lines of the form "nameserver 1.2.3.4" accumulate.
nameservers = [ ] string { }
// Lines of the form "search example.com" overrule - last one wins.
searches = [ ] string { }
lines := strings . Split ( string ( file ) , "\n" )
for l := range lines {
trimmed := strings . TrimSpace ( lines [ l ] )
if strings . HasPrefix ( trimmed , "#" ) {
continue
}
fields := strings . Fields ( trimmed )
if len ( fields ) == 0 {
continue
}
if fields [ 0 ] == "nameserver" {
nameservers = append ( nameservers , fields [ 1 : ] ... )
}
if fields [ 0 ] == "search" {
searches = fields [ 1 : ]
}
}
return nameservers , searches , nil
}
2015-03-26 18:25:48 +00:00
// Kill all running containers in a pod (includes the pod infra container).
2015-08-20 01:57:58 +00:00
func ( kl * Kubelet ) killPod ( pod * api . Pod , runningPod kubecontainer . Pod ) error {
return kl . containerRuntime . KillPod ( pod , runningPod )
2014-08-08 04:49:17 +00:00
}
2014-07-18 18:42:47 +00:00
type empty struct { }
2015-03-04 01:33:48 +00:00
// makePodDataDirs creates the dirs for the pod datas.
2015-03-13 13:19:07 +00:00
func ( kl * Kubelet ) makePodDataDirs ( pod * api . Pod ) error {
2015-03-04 01:33:48 +00:00
uid := pod . UID
if err := os . Mkdir ( kl . getPodDir ( uid ) , 0750 ) ; err != nil && ! os . IsExist ( err ) {
return err
}
if err := os . Mkdir ( kl . getPodVolumesDir ( uid ) , 0750 ) ; err != nil && ! os . IsExist ( err ) {
return err
}
if err := os . Mkdir ( kl . getPodPluginsDir ( uid ) , 0750 ) ; err != nil && ! os . IsExist ( err ) {
return err
}
return nil
}
2015-06-10 00:50:15 +00:00
func ( kl * Kubelet ) syncPod ( pod * api . Pod , mirrorPod * api . Pod , runningPod kubecontainer . Pod , updateType SyncPodType ) error {
2015-03-23 17:14:30 +00:00
podFullName := kubecontainer . GetPodFullName ( pod )
2015-03-10 14:09:55 +00:00
uid := pod . UID
2015-06-09 21:01:23 +00:00
start := time . Now ( )
2015-06-05 19:42:23 +00:00
var firstSeenTime time . Time
if firstSeenTimeStr , ok := pod . Annotations [ ConfigFirstSeenAnnotationKey ] ; ! ok {
glog . V ( 3 ) . Infof ( "First seen time not recorded for pod %q" , pod . UID )
} else {
firstSeenTime = kubeletTypes . ConvertToTimestamp ( firstSeenTimeStr ) . Get ( )
}
2015-03-09 14:23:52 +00:00
// Before returning, regenerate status and store it in the cache.
defer func ( ) {
2015-03-24 23:52:38 +00:00
if isStaticPod ( pod ) && mirrorPod == nil {
// No need to cache the status because the mirror pod does not
// exist yet.
return
}
2015-04-08 18:53:31 +00:00
status , err := kl . generatePodStatus ( pod )
2015-03-09 14:23:52 +00:00
if err != nil {
glog . Errorf ( "Unable to generate status for pod with name %q and uid %q info with error(%v)" , podFullName , uid , err )
} else {
2015-03-24 23:52:38 +00:00
podToUpdate := pod
if mirrorPod != nil {
podToUpdate = mirrorPod
}
2015-08-18 20:26:56 +00:00
existingStatus , ok := kl . statusManager . GetPodStatus ( podToUpdate . UID )
2015-06-05 19:42:23 +00:00
if ! ok || existingStatus . Phase == api . PodPending && status . Phase == api . PodRunning &&
! firstSeenTime . IsZero ( ) {
metrics . PodStartLatency . Observe ( metrics . SinceInMicroseconds ( firstSeenTime ) )
2015-06-09 21:01:23 +00:00
}
2015-03-24 23:52:38 +00:00
kl . statusManager . SetPodStatus ( podToUpdate , status )
2015-03-09 14:23:52 +00:00
}
} ( )
2015-03-26 18:25:48 +00:00
// Kill pods we can't run.
2015-08-20 01:57:58 +00:00
if err := canRunPod ( pod ) ; err != nil || pod . DeletionTimestamp != nil {
if err := kl . killPod ( pod , runningPod ) ; err != nil {
util . HandleError ( err )
}
2015-03-26 18:25:48 +00:00
return err
}
2015-04-17 23:12:08 +00:00
if err := kl . makePodDataDirs ( pod ) ; err != nil {
glog . Errorf ( "Unable to make pod data directories for pod %q (uid %q): %v" , podFullName , uid , err )
return err
}
2015-04-16 00:40:07 +00:00
// Starting phase:
ref , err := api . GetReference ( pod )
if err != nil {
glog . Errorf ( "Couldn't make a ref to pod %q: '%v'" , podFullName , err )
}
// Mount volumes.
podVolumes , err := kl . mountExternalVolumes ( pod )
if err != nil {
if ref != nil {
2015-08-11 07:25:10 +00:00
kl . recorder . Eventf ( ref , "FailedMount" , "Unable to mount volumes for pod %q: %v" , podFullName , err )
2015-04-16 00:40:07 +00:00
}
glog . Errorf ( "Unable to mount volumes for pod %q: %v; skipping pod" , podFullName , err )
return err
}
kl . volumeManager . SetVolumes ( pod . UID , podVolumes )
2015-06-10 00:50:15 +00:00
// The kubelet is the source of truth for pod status. It ignores the status sent from
// the apiserver and regenerates status for every pod update, incrementally updating
// the status it received at pod creation time.
//
// The container runtime needs 2 pieces of information from the status to sync a pod:
// The terminated state of containers (to restart them) and the podIp (for liveness probes).
// New pods don't have either, so we skip the expensive status generation step.
//
// If we end up here with a create event for an already running pod, it could result in a
// restart of its containers. This cannot happen unless the kubelet restarts, because the
// delete before the second create is processed by SyncPods, which cancels this pod worker.
//
// If the kubelet restarts, we have a bunch of running containers for which we get create
// events. This is ok, because the pod status for these will include the podIp and terminated
// status. Any race conditions here effectively boils down to -- the pod worker didn't sync
// state of a newly started container with the apiserver before the kubelet restarted, so
// it's OK to pretend like the kubelet started them after it restarted.
//
// Also note that deletes currently have an updateType of `create` set in UpdatePods.
// This, again, does not matter because deletes are not processed by this method.
var podStatus api . PodStatus
if updateType == SyncPodCreate {
2015-06-05 19:42:23 +00:00
// This is the first time we are syncing the pod. Record the latency
// since kubelet first saw the pod if firstSeenTime is set.
if ! firstSeenTime . IsZero ( ) {
metrics . PodWorkerStartLatency . Observe ( metrics . SinceInMicroseconds ( firstSeenTime ) )
}
2015-06-10 00:50:15 +00:00
podStatus = pod . Status
2015-08-08 01:52:23 +00:00
podStatus . StartTime = & util . Time { Time : start }
2015-06-18 20:28:18 +00:00
kl . statusManager . SetPodStatus ( pod , podStatus )
glog . V ( 3 ) . Infof ( "Not generating pod status for new pod %q" , podFullName )
2015-06-10 00:50:15 +00:00
} else {
var err error
podStatus , err = kl . generatePodStatus ( pod )
if err != nil {
glog . Errorf ( "Unable to get status for pod %q (uid %q): %v" , podFullName , uid , err )
return err
}
2014-06-06 23:40:48 +00:00
}
2014-07-18 18:42:47 +00:00
2015-05-08 17:53:00 +00:00
pullSecrets , err := kl . getPullSecretsForPod ( pod )
if err != nil {
glog . Errorf ( "Unable to get pull secrets for pod %q (uid %q): %v" , podFullName , uid , err )
return err
}
2015-08-13 12:59:15 +00:00
err = kl . containerRuntime . SyncPod ( pod , runningPod , podStatus , pullSecrets , kl . backOff )
2015-05-01 01:37:15 +00:00
if err != nil {
return err
2015-03-10 14:09:55 +00:00
}
2015-04-08 20:28:33 +00:00
if isStaticPod ( pod ) {
if mirrorPod != nil && ! kl . podManager . IsMirrorPodOf ( mirrorPod , pod ) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
glog . Errorf ( "Deleting mirror pod %q because it is outdated" , podFullName )
if err := kl . podManager . DeleteMirrorPod ( podFullName ) ; err != nil {
glog . Errorf ( "Failed deleting mirror pod %q: %v" , podFullName , err )
}
}
if mirrorPod == nil {
glog . V ( 3 ) . Infof ( "Creating a mirror pod %q" , podFullName )
2015-04-20 18:20:53 +00:00
if err := kl . podManager . CreateMirrorPod ( pod ) ; err != nil {
2015-04-08 20:28:33 +00:00
glog . Errorf ( "Failed creating a mirror pod %q: %v" , podFullName , err )
}
2015-03-09 22:46:47 +00:00
}
}
2014-07-01 05:27:56 +00:00
return nil
}
2015-05-08 17:53:00 +00:00
// getPullSecretsForPod inspects the Pod and retrieves the referenced pull secrets
// TODO duplicate secrets are being retrieved multiple times and there is no cache. Creating and using a secret manager interface will make this easier to address.
func ( kl * Kubelet ) getPullSecretsForPod ( pod * api . Pod ) ( [ ] api . Secret , error ) {
pullSecrets := [ ] api . Secret { }
for _ , secretRef := range pod . Spec . ImagePullSecrets {
secret , err := kl . kubeClient . Secrets ( pod . Namespace ) . Get ( secretRef . Name )
if err != nil {
2015-08-14 16:51:28 +00:00
glog . Warningf ( "Unable to retrieve pull secret %s/%s for %s/%s due to %v. The image pull may not succeed." , pod . Namespace , secretRef . Name , pod . Namespace , pod . Name , err )
continue
2015-05-08 17:53:00 +00:00
}
pullSecrets = append ( pullSecrets , * secret )
}
return pullSecrets , nil
}
2014-07-30 21:04:19 +00:00
// Stores all volumes defined by the set of pods into a map.
// Keys for each entry are in the format (POD_ID)/(VOLUME_NAME)
2015-04-03 22:51:50 +00:00
func getDesiredVolumes ( pods [ ] * api . Pod ) map [ string ] api . Volume {
2014-07-30 21:04:19 +00:00
desiredVolumes := make ( map [ string ] api . Volume )
2014-07-25 20:16:59 +00:00
for _ , pod := range pods {
2014-10-08 19:56:02 +00:00
for _ , volume := range pod . Spec . Volumes {
2014-11-23 15:47:25 +00:00
identifier := path . Join ( string ( pod . UID ) , volume . Name )
2014-07-30 21:04:19 +00:00
desiredVolumes [ identifier ] = volume
2014-07-25 20:16:59 +00:00
}
}
2014-07-30 21:04:19 +00:00
return desiredVolumes
2014-07-25 20:16:59 +00:00
}
2015-04-14 22:26:50 +00:00
func ( kl * Kubelet ) cleanupOrphanedPodDirs ( pods [ ] * api . Pod ) error {
2015-01-12 00:42:11 +00:00
desired := util . NewStringSet ( )
2015-04-03 22:51:50 +00:00
for _ , pod := range pods {
desired . Insert ( string ( pod . UID ) )
2015-01-12 00:42:11 +00:00
}
found , err := kl . listPodsFromDisk ( )
if err != nil {
return err
}
errlist := [ ] error { }
for i := range found {
2015-01-14 21:53:43 +00:00
if ! desired . Has ( string ( found [ i ] ) ) {
2015-01-12 00:42:11 +00:00
glog . V ( 3 ) . Infof ( "Orphaned pod %q found, removing" , found [ i ] )
2014-11-23 15:47:25 +00:00
if err := os . RemoveAll ( kl . getPodDir ( found [ i ] ) ) ; err != nil {
2015-01-12 00:42:11 +00:00
errlist = append ( errlist , err )
}
}
}
2015-03-06 07:56:30 +00:00
return utilErrors . NewAggregate ( errlist )
2015-01-12 00:42:11 +00:00
}
2014-07-30 21:04:19 +00:00
// Compares the map of current volumes to the map of desired volumes.
// If an active volume does not have a respective desired volume, clean it up.
2015-04-29 17:47:25 +00:00
func ( kl * Kubelet ) cleanupOrphanedVolumes ( pods [ ] * api . Pod , runningPods [ ] * kubecontainer . Pod ) error {
2014-07-30 21:04:19 +00:00
desiredVolumes := getDesiredVolumes ( pods )
2014-11-23 15:47:25 +00:00
currentVolumes := kl . getPodVolumesFromDisk ( )
2015-04-29 17:47:25 +00:00
2015-02-03 20:14:16 +00:00
runningSet := util . StringSet { }
2015-04-29 17:47:25 +00:00
for _ , pod := range runningPods {
runningSet . Insert ( string ( pod . ID ) )
2015-02-03 20:14:16 +00:00
}
2015-04-29 17:47:25 +00:00
2014-07-30 21:04:19 +00:00
for name , vol := range currentVolumes {
if _ , ok := desiredVolumes [ name ] ; ! ok {
2015-02-03 20:14:16 +00:00
parts := strings . Split ( name , "/" )
if runningSet . Has ( parts [ 0 ] ) {
2015-03-19 23:51:34 +00:00
glog . Infof ( "volume %q, still has a container running %q, skipping teardown" , name , parts [ 0 ] )
2015-02-03 20:14:16 +00:00
continue
}
2014-07-30 21:04:19 +00:00
//TODO (jonesdl) We should somehow differentiate between volumes that are supposed
//to be deleted and volumes that are leftover after a crash.
2015-01-06 00:38:47 +00:00
glog . Warningf ( "Orphaned volume %q found, tearing down volume" , name )
2015-04-16 00:40:07 +00:00
// TODO(yifan): Refactor this hacky string manipulation.
kl . volumeManager . DeleteVolumes ( types . UID ( parts [ 0 ] ) )
2014-07-30 21:04:19 +00:00
//TODO (jonesdl) This should not block other kubelet synchronization procedures
err := vol . TearDown ( )
2014-07-29 17:20:50 +00:00
if err != nil {
2015-01-06 00:38:47 +00:00
glog . Errorf ( "Could not tear down volume %q: %v" , name , err )
2014-07-29 17:20:50 +00:00
}
2014-07-25 20:16:59 +00:00
}
}
return nil
}
2015-08-20 01:57:58 +00:00
// Delete any pods that are no longer running and are marked for deletion.
func ( kl * Kubelet ) cleanupTerminatedPods ( pods [ ] * api . Pod , runningPods [ ] * kubecontainer . Pod ) error {
var terminating [ ] * api . Pod
for _ , pod := range pods {
if pod . DeletionTimestamp != nil {
found := false
for _ , runningPod := range runningPods {
if runningPod . ID == pod . UID {
found = true
break
}
}
if found {
podFullName := kubecontainer . GetPodFullName ( pod )
glog . V ( 5 ) . Infof ( "Keeping terminated pod %q and uid %q, still running" , podFullName , pod . UID )
continue
}
terminating = append ( terminating , pod )
}
}
if ! kl . statusManager . TerminatePods ( terminating ) {
return errors . New ( "not all pods were successfully terminated" )
}
return nil
}
2015-05-16 00:01:56 +00:00
// pastActiveDeadline returns true if the pod has been active for more than
// ActiveDeadlineSeconds.
func ( kl * Kubelet ) pastActiveDeadline ( pod * api . Pod ) bool {
2015-05-09 05:01:43 +00:00
now := util . Now ( )
2015-05-16 00:01:56 +00:00
if pod . Spec . ActiveDeadlineSeconds != nil {
2015-08-18 20:26:56 +00:00
podStatus , ok := kl . statusManager . GetPodStatus ( pod . UID )
2015-05-16 00:01:56 +00:00
if ! ok {
podStatus = pod . Status
2015-05-09 05:01:43 +00:00
}
2015-05-16 00:01:56 +00:00
if ! podStatus . StartTime . IsZero ( ) {
startTime := podStatus . StartTime . Time
duration := now . Time . Sub ( startTime )
allowedDuration := time . Duration ( * pod . Spec . ActiveDeadlineSeconds ) * time . Second
if duration >= allowedDuration {
return true
}
2015-05-09 05:01:43 +00:00
}
}
2015-05-16 00:01:56 +00:00
return false
}
//podIsTerminated returns true if status is in one of the terminated state.
func podIsTerminated ( status * api . PodStatus ) bool {
if status . Phase == api . PodFailed || status . Phase == api . PodSucceeded {
return true
}
return false
2015-05-09 05:01:43 +00:00
}
2015-04-24 18:20:23 +00:00
// Filter out pods in the terminated state ("Failed" or "Succeeded").
func ( kl * Kubelet ) filterOutTerminatedPods ( allPods [ ] * api . Pod ) [ ] * api . Pod {
var pods [ ] * api . Pod
for _ , pod := range allPods {
var status api . PodStatus
// Check the cached pod status which was set after the last sync.
2015-08-18 20:26:56 +00:00
status , ok := kl . statusManager . GetPodStatus ( pod . UID )
2015-04-24 18:20:23 +00:00
if ! ok {
// If there is no cached status, use the status from the
// apiserver. This is useful if kubelet has recently been
// restarted.
status = pod . Status
}
2015-05-16 00:01:56 +00:00
if podIsTerminated ( & status ) {
2015-04-24 18:20:23 +00:00
continue
}
pods = append ( pods , pod )
}
return pods
}
2014-07-15 20:24:41 +00:00
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
2015-06-10 00:50:15 +00:00
func ( kl * Kubelet ) SyncPods ( allPods [ ] * api . Pod , podSyncTypes map [ types . UID ] SyncPodType ,
2015-04-20 18:20:53 +00:00
mirrorPods map [ string ] * api . Pod , start time . Time ) error {
2015-02-26 17:16:52 +00:00
defer func ( ) {
metrics . SyncPodsLatency . Observe ( metrics . SinceInMicroseconds ( start ) )
} ( )
2015-03-03 18:33:25 +00:00
2015-08-18 20:26:56 +00:00
kl . removeOrphanedPodStatuses ( allPods , mirrorPods )
2015-05-14 20:02:36 +00:00
// Handles pod admission.
pods := kl . admitPods ( allPods , podSyncTypes )
2015-06-24 22:53:25 +00:00
glog . V ( 4 ) . Infof ( "Desired pods: %s" , kubeletUtil . FormatPodNames ( pods ) )
2015-08-11 23:25:17 +00:00
// Send updates to pod workers.
kl . dispatchWork ( pods , podSyncTypes , mirrorPods , start )
// Clean up unwanted/orphaned resources.
if err := kl . cleanupPods ( allPods , pods ) ; err != nil {
2014-07-15 17:26:56 +00:00
return err
}
2015-08-11 23:25:17 +00:00
return nil
}
2014-07-15 17:26:56 +00:00
2015-08-11 23:25:17 +00:00
// removeOrphanedPodStatuses removes obsolete entries in podStatus where
// the pod is no longer considered bound to this node.
2015-08-18 20:26:56 +00:00
func ( kl * Kubelet ) removeOrphanedPodStatuses ( pods [ ] * api . Pod , mirrorPods map [ string ] * api . Pod ) {
podUIDs := make ( map [ types . UID ] bool )
2015-08-11 23:25:17 +00:00
for _ , pod := range pods {
2015-08-18 20:26:56 +00:00
podUIDs [ pod . UID ] = true
2015-08-11 23:25:17 +00:00
}
2015-08-18 20:26:56 +00:00
for _ , pod := range mirrorPods {
podUIDs [ pod . UID ] = true
}
kl . statusManager . RemoveOrphanedStatuses ( podUIDs )
2015-08-11 23:25:17 +00:00
}
// dispatchWork dispatches pod updates to workers.
func ( kl * Kubelet ) dispatchWork ( pods [ ] * api . Pod , podSyncTypes map [ types . UID ] SyncPodType ,
mirrorPods map [ string ] * api . Pod , start time . Time ) {
2014-07-01 05:27:56 +00:00
// Check for any containers that need starting
2015-04-03 22:51:50 +00:00
for _ , pod := range pods {
2015-03-23 17:14:30 +00:00
podFullName := kubecontainer . GetPodFullName ( pod )
2014-07-18 18:42:47 +00:00
// Run the sync in an async manifest worker.
2015-04-20 18:20:53 +00:00
kl . podWorkers . UpdatePod ( pod , mirrorPods [ podFullName ] , func ( ) {
2015-06-09 21:01:23 +00:00
metrics . PodWorkerLatency . WithLabelValues ( podSyncTypes [ pod . UID ] . String ( ) ) . Observe ( metrics . SinceInMicroseconds ( start ) )
2015-02-24 23:29:18 +00:00
} )
2015-02-26 17:25:01 +00:00
// Note the number of containers for new pods.
2015-06-10 00:50:15 +00:00
if val , ok := podSyncTypes [ pod . UID ] ; ok && ( val == SyncPodCreate ) {
2015-02-26 17:25:01 +00:00
metrics . ContainersPerPodCount . Observe ( float64 ( len ( pod . Spec . Containers ) ) )
}
2014-07-01 16:37:45 +00:00
}
2015-08-11 23:25:17 +00:00
}
// cleanupPods performs a series of cleanup work, including terminating pod
// workers, killing unwanted pods, and removing orphaned volumes/pod
// directories.
func ( kl * Kubelet ) cleanupPods ( allPods [ ] * api . Pod , admittedPods [ ] * api . Pod ) error {
desiredPods := make ( map [ types . UID ] empty )
for _ , pod := range admittedPods {
desiredPods [ pod . UID ] = empty { }
}
2015-02-27 09:19:41 +00:00
// Stop the workers for no-longer existing pods.
kl . podWorkers . ForgetNonExistingPodWorkers ( desiredPods )
2015-03-05 18:49:36 +00:00
if ! kl . sourcesReady ( ) {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
glog . V ( 4 ) . Infof ( "Skipping deletes, sources aren't ready yet." )
return nil
}
2015-02-26 20:27:14 +00:00
2015-08-11 23:25:17 +00:00
runningPods , err := kl . runtimeCache . GetPods ( )
if err != nil {
glog . Errorf ( "Error listing containers: %#v" , err )
return err
}
2015-04-29 17:47:25 +00:00
// Kill containers associated with unwanted pods.
err = kl . killUnwantedPods ( desiredPods , runningPods )
2015-02-03 20:14:16 +00:00
if err != nil {
2015-04-14 22:26:50 +00:00
glog . Errorf ( "Failed killing unwanted containers: %v" , err )
2015-04-29 17:47:25 +00:00
}
// Note that we just killed the unwanted pods. This may not have reflected
2015-05-11 17:50:14 +00:00
// in the cache. We need to bypass the cache to get the latest set of
2015-04-29 17:47:25 +00:00
// running pods to clean up the volumes.
// TODO: Evaluate the performance impact of bypassing the runtime cache.
2015-05-01 22:25:11 +00:00
runningPods , err = kl . containerRuntime . GetPods ( false )
2015-04-29 17:47:25 +00:00
if err != nil {
glog . Errorf ( "Error listing containers: %#v" , err )
2015-02-03 20:14:16 +00:00
return err
}
2015-02-04 01:46:28 +00:00
// Remove any orphaned volumes.
2015-05-18 20:12:35 +00:00
// Note that we pass all pods (including terminated pods) to the function,
// so that we don't remove volumes associated with terminated but not yet
// deleted pods.
err = kl . cleanupOrphanedVolumes ( allPods , runningPods )
2015-01-12 00:42:11 +00:00
if err != nil {
2015-04-14 22:26:50 +00:00
glog . Errorf ( "Failed cleaning up orphaned volumes: %v" , err )
2015-01-12 00:42:11 +00:00
return err
}
2015-04-14 22:26:50 +00:00
// Remove any orphaned pod directories.
2015-05-18 20:12:35 +00:00
// Note that we pass all pods (including terminated pods) to the function,
// so that we don't remove directories associated with terminated but not yet
// deleted pods.
err = kl . cleanupOrphanedPodDirs ( allPods )
2015-01-12 00:42:11 +00:00
if err != nil {
2015-04-14 22:26:50 +00:00
glog . Errorf ( "Failed cleaning up orphaned pod directories: %v" , err )
2015-01-12 00:42:11 +00:00
return err
}
2014-07-30 21:04:19 +00:00
2015-03-09 22:46:47 +00:00
// Remove any orphaned mirror pods.
2015-03-23 19:17:12 +00:00
kl . podManager . DeleteOrphanedMirrorPods ( )
2015-03-09 22:46:47 +00:00
2015-08-20 01:57:58 +00:00
if err := kl . cleanupTerminatedPods ( allPods , runningPods ) ; err != nil {
glog . Errorf ( "Failed to cleanup terminated pods: %v" , err )
}
2015-08-13 12:59:15 +00:00
kl . backOff . GC ( )
2014-06-06 23:40:48 +00:00
return err
}
2015-04-29 17:47:25 +00:00
// killUnwantedPods kills the unwanted, running pods in parallel.
2015-04-14 22:26:50 +00:00
func ( kl * Kubelet ) killUnwantedPods ( desiredPods map [ types . UID ] empty ,
2015-04-29 17:47:25 +00:00
runningPods [ ] * kubecontainer . Pod ) error {
ch := make ( chan error , len ( runningPods ) )
2015-04-14 22:26:50 +00:00
defer close ( ch )
numWorkers := 0
for _ , pod := range runningPods {
if _ , found := desiredPods [ pod . ID ] ; found {
// Per-pod workers will handle the desired pods.
continue
}
numWorkers ++
2015-04-29 17:47:25 +00:00
go func ( pod * kubecontainer . Pod , ch chan error ) {
var err error = nil
2015-04-14 22:26:50 +00:00
defer func ( ) {
2015-04-29 17:47:25 +00:00
ch <- err
2015-04-14 22:26:50 +00:00
} ( )
glog . V ( 1 ) . Infof ( "Killing unwanted pod %q" , pod . Name )
// Stop the containers.
2015-08-20 01:57:58 +00:00
err = kl . killPod ( nil , * pod )
2015-04-14 22:26:50 +00:00
if err != nil {
glog . Errorf ( "Failed killing the pod %q: %v" , pod . Name , err )
return
}
} ( pod , ch )
}
2015-04-29 17:47:25 +00:00
// Aggregate errors from the pod killing workers.
2015-04-14 22:26:50 +00:00
var errs [ ] error
for i := 0 ; i < numWorkers ; i ++ {
2015-04-29 17:47:25 +00:00
err := <- ch
if err != nil {
errs = append ( errs , err )
2015-04-14 22:26:50 +00:00
}
}
2015-04-29 17:47:25 +00:00
return utilErrors . NewAggregate ( errs )
2015-04-14 22:26:50 +00:00
}
2015-04-03 22:51:50 +00:00
type podsByCreationTime [ ] * api . Pod
2015-02-27 21:43:21 +00:00
func ( s podsByCreationTime ) Len ( ) int {
return len ( s )
}
func ( s podsByCreationTime ) Swap ( i , j int ) {
s [ i ] , s [ j ] = s [ j ] , s [ i ]
}
func ( s podsByCreationTime ) Less ( i , j int ) bool {
return s [ i ] . CreationTimestamp . Before ( s [ j ] . CreationTimestamp )
}
2015-03-20 16:52:32 +00:00
// checkHostPortConflicts detects pods with conflicted host ports.
2015-04-03 22:51:50 +00:00
func checkHostPortConflicts ( pods [ ] * api . Pod ) ( fitting [ ] * api . Pod , notFitting [ ] * api . Pod ) {
2015-05-19 17:17:53 +00:00
ports := util . StringSet { }
2015-02-27 21:43:21 +00:00
// Respect the pod creation order when resolving conflicts.
sort . Sort ( podsByCreationTime ( pods ) )
2015-04-03 22:51:50 +00:00
for _ , pod := range pods {
2015-05-19 17:17:53 +00:00
if errs := validation . AccumulateUniqueHostPorts ( pod . Spec . Containers , & ports ) ; len ( errs ) != 0 {
2015-03-23 17:14:30 +00:00
glog . Errorf ( "Pod %q: HostPort is already allocated, ignoring: %v" , kubecontainer . GetPodFullName ( pod ) , errs )
2015-04-03 22:51:50 +00:00
notFitting = append ( notFitting , pod )
2014-07-15 20:24:41 +00:00
continue
2014-07-08 04:48:47 +00:00
}
2015-04-03 22:51:50 +00:00
fitting = append ( fitting , pod )
2014-07-08 04:48:47 +00:00
}
2015-03-20 16:52:32 +00:00
return
2015-03-03 18:33:25 +00:00
}
2015-07-24 01:27:29 +00:00
// checkSufficientfFreeResources detects pods that exceeds node's resources.
func ( kl * Kubelet ) checkSufficientfFreeResources ( pods [ ] * api . Pod ) ( fitting [ ] * api . Pod , notFittingCPU , notFittingMemory [ ] * api . Pod ) {
2015-03-16 12:50:00 +00:00
info , err := kl . GetCachedMachineInfo ( )
if err != nil {
2015-03-31 22:32:02 +00:00
glog . Errorf ( "error getting machine info: %v" , err )
2015-07-24 01:27:29 +00:00
return pods , nil , nil
2015-03-16 12:50:00 +00:00
}
// Respect the pod creation order when resolving conflicts.
sort . Sort ( podsByCreationTime ( pods ) )
capacity := CapacityFromMachineInfo ( info )
2015-07-24 01:27:29 +00:00
return predicates . CheckPodsExceedingFreeResources ( pods , capacity )
2015-03-20 16:52:32 +00:00
}
2015-05-12 08:24:08 +00:00
// handleOutOfDisk detects if pods can't fit due to lack of disk space.
2015-06-10 00:50:15 +00:00
func ( kl * Kubelet ) handleOutOfDisk ( pods [ ] * api . Pod , podSyncTypes map [ types . UID ] SyncPodType ) [ ] * api . Pod {
2015-05-12 08:24:08 +00:00
if len ( podSyncTypes ) == 0 {
// regular sync. no new pods
2015-05-14 20:02:36 +00:00
return pods
2015-05-12 08:24:08 +00:00
}
outOfDockerDisk := false
outOfRootDisk := false
// Check disk space once globally and reject or accept all new pods.
withinBounds , err := kl . diskSpaceManager . IsDockerDiskSpaceAvailable ( )
// Assume enough space in case of errors.
if err == nil && ! withinBounds {
outOfDockerDisk = true
}
withinBounds , err = kl . diskSpaceManager . IsRootDiskSpaceAvailable ( )
// Assume enough space in case of errors.
if err == nil && ! withinBounds {
outOfRootDisk = true
}
// Kubelet would indicate all pods as newly created on the first run after restart.
// We ignore the first disk check to ensure that running pods are not killed.
// Disk manager will only declare out of disk problems if unfreeze has been called.
kl . diskSpaceManager . Unfreeze ( )
2015-05-14 20:02:36 +00:00
if ! outOfDockerDisk && ! outOfRootDisk {
// Disk space is fine.
return pods
}
var fitting [ ] * api . Pod
for i := range pods {
pod := pods [ i ]
// Only reject pods that didn't start yet.
2015-06-10 00:50:15 +00:00
if podSyncTypes [ pod . UID ] == SyncPodCreate {
2015-06-09 15:58:16 +00:00
reason := "OutOfDisk"
kl . recorder . Eventf ( pod , reason , "Cannot start the pod due to lack of disk space." )
2015-05-14 20:02:36 +00:00
kl . statusManager . SetPodStatus ( pod , api . PodStatus {
Phase : api . PodFailed ,
2015-06-09 15:58:16 +00:00
Reason : reason ,
2015-05-14 20:02:36 +00:00
Message : "Pod cannot be started due to lack of disk space." } )
continue
2015-05-12 08:24:08 +00:00
}
2015-05-14 20:02:36 +00:00
fitting = append ( fitting , pod )
2015-05-12 08:24:08 +00:00
}
2015-05-14 20:02:36 +00:00
return fitting
2015-05-12 08:24:08 +00:00
}
2015-03-20 16:52:32 +00:00
// checkNodeSelectorMatching detects pods that do not match node's labels.
2015-04-03 22:51:50 +00:00
func ( kl * Kubelet ) checkNodeSelectorMatching ( pods [ ] * api . Pod ) ( fitting [ ] * api . Pod , notFitting [ ] * api . Pod ) {
2015-06-13 00:28:34 +00:00
if kl . standaloneMode {
return pods , notFitting
}
2015-03-20 16:52:32 +00:00
node , err := kl . GetNode ( )
if err != nil {
glog . Errorf ( "error getting node: %v" , err )
2015-04-03 22:51:50 +00:00
return pods , nil
2015-03-20 16:52:32 +00:00
}
for _ , pod := range pods {
2015-05-08 11:01:09 +00:00
if ! predicates . PodMatchesNodeLabels ( pod , node ) {
2015-03-20 16:52:32 +00:00
notFitting = append ( notFitting , pod )
continue
}
fitting = append ( fitting , pod )
}
return
2015-03-16 12:50:00 +00:00
}
2015-05-14 20:02:36 +00:00
// handleNotfittingPods handles pods that do not fit on the node and returns
// the pods that fit. It currently checks host port conflicts, node selector
// mismatches, and exceeded node capacity.
func ( kl * Kubelet ) handleNotFittingPods ( pods [ ] * api . Pod ) [ ] * api . Pod {
2015-03-20 16:52:32 +00:00
fitting , notFitting := checkHostPortConflicts ( pods )
for _ , pod := range notFitting {
2015-06-09 15:58:16 +00:00
reason := "HostPortConflict"
kl . recorder . Eventf ( pod , reason , "Cannot start the pod due to host port conflict." )
2015-04-03 22:51:50 +00:00
kl . statusManager . SetPodStatus ( pod , api . PodStatus {
2015-03-03 18:33:25 +00:00
Phase : api . PodFailed ,
2015-06-09 15:58:16 +00:00
Reason : reason ,
2015-03-03 18:33:25 +00:00
Message : "Pod cannot be started due to host port conflict" } )
2015-03-16 12:50:00 +00:00
}
2015-03-20 16:52:32 +00:00
fitting , notFitting = kl . checkNodeSelectorMatching ( fitting )
for _ , pod := range notFitting {
2015-06-09 15:58:16 +00:00
reason := "NodeSelectorMismatching"
kl . recorder . Eventf ( pod , reason , "Cannot start the pod due to node selector mismatch." )
2015-04-03 22:51:50 +00:00
kl . statusManager . SetPodStatus ( pod , api . PodStatus {
2015-03-20 16:52:32 +00:00
Phase : api . PodFailed ,
2015-06-09 15:58:16 +00:00
Reason : reason ,
2015-03-20 16:52:32 +00:00
Message : "Pod cannot be started due to node selector mismatch" } )
2015-03-16 12:50:00 +00:00
}
2015-07-24 01:27:29 +00:00
fitting , notFittingCPU , notFittingMemory := kl . checkSufficientfFreeResources ( fitting )
for _ , pod := range notFittingCPU {
reason := "InsufficientFreeCPU"
kl . recorder . Eventf ( pod , reason , "Cannot start the pod due to insufficient free CPU." )
kl . statusManager . SetPodStatus ( pod , api . PodStatus {
Phase : api . PodFailed ,
Reason : reason ,
Message : "Pod cannot be started due to insufficient free CPU" } )
}
for _ , pod := range notFittingMemory {
reason := "InsufficientFreeMemory"
kl . recorder . Eventf ( pod , reason , "Cannot start the pod due to insufficient free memory." )
2015-04-03 22:51:50 +00:00
kl . statusManager . SetPodStatus ( pod , api . PodStatus {
2015-03-16 12:50:00 +00:00
Phase : api . PodFailed ,
2015-06-09 15:58:16 +00:00
Reason : reason ,
2015-07-24 01:27:29 +00:00
Message : "Pod cannot be started due to insufficient free memory" } )
2015-03-03 18:33:25 +00:00
}
2015-05-14 20:02:36 +00:00
return fitting
}
// admitPods handles pod admission. It filters out terminated pods, and pods
// that don't fit on the node, and may reject pods if node is overcommitted.
2015-06-10 00:50:15 +00:00
func ( kl * Kubelet ) admitPods ( allPods [ ] * api . Pod , podSyncTypes map [ types . UID ] SyncPodType ) [ ] * api . Pod {
2015-05-14 20:02:36 +00:00
// Pod phase progresses monotonically. Once a pod has reached a final state,
2015-08-08 21:29:57 +00:00
// it should never leave regardless of the restart policy. The statuses
2015-05-14 20:02:36 +00:00
// of such pods should not be changed, and there is no need to sync them.
// TODO: the logic here does not handle two cases:
// 1. If the containers were removed immediately after they died, kubelet
// may fail to generate correct statuses, let alone filtering correctly.
// 2. If kubelet restarted before writing the terminated status for a pod
// to the apiserver, it could still restart the terminated pod (even
// though the pod was not considered terminated by the apiserver).
// These two conditions could be alleviated by checkpointing kubelet.
pods := kl . filterOutTerminatedPods ( allPods )
// Respect the pod creation order when resolving conflicts.
sort . Sort ( podsByCreationTime ( pods ) )
// Reject pods that we cannot run.
// handleNotFittingPods relies on static information (e.g. immutable fields
// in the pod specs or machine information that doesn't change without
// rebooting), and the pods are sorted by immutable creation time. Hence it
// should only rejects new pods without checking the pod sync types.
fitting := kl . handleNotFittingPods ( pods )
// Reject new creation requests if diskspace is running low.
admittedPods := kl . handleOutOfDisk ( fitting , podSyncTypes )
return admittedPods
2014-07-08 04:48:47 +00:00
}
2014-07-01 20:01:39 +00:00
// syncLoop is the main loop for processing changes. It watches for changes from
2015-03-11 23:40:20 +00:00
// three channels (file, apiserver, and http) and creates a union of them. For
2014-06-06 23:40:48 +00:00
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
2015-08-11 20:29:50 +00:00
// state every sync-frequency seconds. Never returns.
2014-07-15 20:24:41 +00:00
func ( kl * Kubelet ) syncLoop ( updates <- chan PodUpdate , handler SyncHandler ) {
2015-04-08 20:57:19 +00:00
glog . Info ( "Starting kubelet main sync loop." )
2014-06-06 23:40:48 +00:00
for {
2015-06-17 22:31:46 +00:00
kl . syncLoopIteration ( updates , handler )
}
}
func ( kl * Kubelet ) syncLoopIteration ( updates <- chan PodUpdate , handler SyncHandler ) {
kl . syncLoopMonitor . Store ( time . Now ( ) )
if ! kl . containerRuntimeUp ( ) {
time . Sleep ( 5 * time . Second )
glog . Infof ( "Skipping pod synchronization, container runtime is not up." )
return
}
2015-06-24 18:10:10 +00:00
if ! kl . doneNetworkConfigure ( ) {
time . Sleep ( 5 * time . Second )
glog . Infof ( "Skipping pod synchronization, network is not configured" )
return
}
2015-06-17 22:31:46 +00:00
unsyncedPod := false
podSyncTypes := make ( map [ types . UID ] SyncPodType )
select {
case u , ok := <- updates :
if ! ok {
glog . Errorf ( "Update channel is closed. Exiting the sync loop." )
return
2015-06-18 05:34:11 +00:00
}
2015-06-17 22:31:46 +00:00
kl . podManager . UpdatePods ( u , podSyncTypes )
unsyncedPod = true
kl . syncLoopMonitor . Store ( time . Now ( ) )
case <- time . After ( kl . resyncInterval ) :
glog . V ( 4 ) . Infof ( "Periodic sync" )
}
start := time . Now ( )
// If we already caught some update, try to wait for some short time
// to possibly batch it with other incoming updates.
for unsyncedPod {
2014-06-06 23:40:48 +00:00
select {
2015-06-17 22:31:46 +00:00
case u := <- updates :
2015-03-21 00:22:02 +00:00
kl . podManager . UpdatePods ( u , podSyncTypes )
2015-06-17 22:31:46 +00:00
kl . syncLoopMonitor . Store ( time . Now ( ) )
case <- time . After ( 5 * time . Millisecond ) :
// Break the for loop.
unsyncedPod = false
2014-06-06 23:40:48 +00:00
}
}
2015-06-17 22:31:46 +00:00
pods , mirrorPods := kl . podManager . GetPodsAndMirrorMap ( )
kl . syncLoopMonitor . Store ( time . Now ( ) )
if err := handler . SyncPods ( pods , podSyncTypes , mirrorPods , start ) ; err != nil {
glog . Errorf ( "Couldn't sync containers: %v" , err )
}
kl . syncLoopMonitor . Store ( time . Now ( ) )
}
func ( kl * Kubelet ) LatestLoopEntryTime ( ) time . Time {
val := kl . syncLoopMonitor . Load ( )
if val == nil {
return time . Time { }
}
return val . ( time . Time )
2014-06-06 23:40:48 +00:00
}
2015-04-21 20:02:50 +00:00
// Returns the container runtime version for this Kubelet.
func ( kl * Kubelet ) GetContainerRuntimeVersion ( ) ( kubecontainer . Version , error ) {
2015-05-01 22:25:11 +00:00
if kl . containerRuntime == nil {
2015-04-21 20:02:50 +00:00
return nil , fmt . Errorf ( "no container runtime" )
2015-02-04 17:14:17 +00:00
}
2015-05-01 22:25:11 +00:00
return kl . containerRuntime . Version ( )
2015-02-04 17:14:17 +00:00
}
2015-02-24 00:33:43 +00:00
func ( kl * Kubelet ) validatePodPhase ( podStatus * api . PodStatus ) error {
switch podStatus . Phase {
case api . PodRunning , api . PodSucceeded , api . PodFailed :
return nil
}
return fmt . Errorf ( "pod is not in 'Running', 'Succeeded' or 'Failed' state - State: %q" , podStatus . Phase )
}
2015-05-07 18:34:16 +00:00
func ( kl * Kubelet ) validateContainerStatus ( podStatus * api . PodStatus , containerName string , previous bool ) ( containerID string , err error ) {
var cID string
2015-03-25 11:09:35 +00:00
cStatus , found := api . GetContainerStatus ( podStatus . ContainerStatuses , containerName )
if ! found {
return "" , fmt . Errorf ( "container %q not found in pod" , containerName )
2015-02-24 00:33:43 +00:00
}
2015-05-07 18:34:16 +00:00
if previous {
2015-05-27 22:02:11 +00:00
if cStatus . LastTerminationState . Terminated == nil {
2015-05-07 18:34:16 +00:00
return "" , fmt . Errorf ( "previous terminated container %q not found in pod" , containerName )
}
2015-05-27 22:02:11 +00:00
cID = cStatus . LastTerminationState . Terminated . ContainerID
2015-05-07 18:34:16 +00:00
} else {
if cStatus . State . Waiting != nil {
return "" , fmt . Errorf ( "container %q is in waiting state." , containerName )
}
cID = cStatus . ContainerID
2015-03-25 11:09:35 +00:00
}
2015-05-07 18:34:16 +00:00
return kubecontainer . TrimRuntimePrefix ( cID ) , nil
2015-02-24 00:33:43 +00:00
}
2014-08-27 19:41:32 +00:00
// GetKubeletContainerLogs returns logs from the container
2015-01-07 15:18:56 +00:00
// TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt
// or all of them.
2015-05-07 18:34:16 +00:00
func ( kl * Kubelet ) GetKubeletContainerLogs ( podFullName , containerName , tail string , follow , previous bool , stdout , stderr io . Writer ) error {
2015-05-01 23:12:14 +00:00
// TODO(vmarmol): Refactor to not need the pod status and verification.
2015-05-15 22:30:28 +00:00
// Pod workers periodically write status to statusManager. If status is not
// cached there, something is wrong (or kubelet just restarted and hasn't
// caught up yet). Just assume the pod is not ready yet.
2015-08-18 20:26:56 +00:00
name , namespace , err := kubecontainer . ParsePodFullName ( podFullName )
if err != nil {
return fmt . Errorf ( "unable to parse pod full name %q: %v" , podFullName , err )
}
pod , ok := kl . GetPodByName ( namespace , name )
if ! ok {
return fmt . Errorf ( "unable to get logs for container %q in pod %q: unable to find pod" , containerName , podFullName )
}
podStatus , found := kl . statusManager . GetPodStatus ( pod . UID )
2015-05-15 22:30:28 +00:00
if ! found {
return fmt . Errorf ( "failed to get status for pod %q" , podFullName )
2014-09-17 19:00:09 +00:00
}
2015-08-18 20:26:56 +00:00
2015-02-24 00:33:43 +00:00
if err := kl . validatePodPhase ( & podStatus ) ; err != nil {
2015-04-09 18:57:53 +00:00
// No log is available if pod is not in a "known" phase (e.g. Unknown).
2015-02-24 00:33:43 +00:00
return err
2015-02-12 01:03:59 +00:00
}
2015-05-07 18:34:16 +00:00
containerID , err := kl . validateContainerStatus ( & podStatus , containerName , previous )
2015-02-24 00:33:43 +00:00
if err != nil {
2015-04-09 18:57:53 +00:00
// No log is available if the container status is missing or is in the
// waiting state.
2015-02-24 00:33:43 +00:00
return err
2015-02-12 01:03:59 +00:00
}
2015-05-01 23:07:05 +00:00
return kl . containerRuntime . GetContainerLogs ( pod , containerID , tail , follow , stdout , stderr )
2014-08-27 19:41:32 +00:00
}
2015-02-09 16:40:42 +00:00
// GetHostname Returns the hostname as the kubelet sees it.
func ( kl * Kubelet ) GetHostname ( ) string {
return kl . hostname
}
2015-03-24 12:35:38 +00:00
// Returns host IP or nil in case of error.
func ( kl * Kubelet ) GetHostIP ( ) ( net . IP , error ) {
node , err := kl . GetNode ( )
if err != nil {
2015-04-14 05:44:23 +00:00
return nil , fmt . Errorf ( "cannot get node: %v" , err )
2015-03-24 12:35:38 +00:00
}
2015-05-26 23:13:00 +00:00
return nodeutil . GetNodeHostIP ( node )
2015-03-24 12:35:38 +00:00
}
2015-03-09 22:46:47 +00:00
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
2015-03-23 19:17:12 +00:00
// pods.
2015-04-03 22:51:50 +00:00
func ( kl * Kubelet ) GetPods ( ) [ ] * api . Pod {
2015-03-21 00:22:02 +00:00
return kl . podManager . GetPods ( )
2014-10-22 23:52:38 +00:00
}
2015-06-23 23:01:12 +00:00
// GetRunningPods returns all pods running on kubelet from looking at the
// container runtime cache. This function converts kubecontainer.Pod to
// api.Pod, so only the fields that exist in both kubecontainer.Pod and
// api.Pod are considered meaningful.
func ( kl * Kubelet ) GetRunningPods ( ) ( [ ] * api . Pod , error ) {
pods , err := kl . runtimeCache . GetPods ( )
if err != nil {
return nil , err
}
apiPods := make ( [ ] * api . Pod , 0 , len ( pods ) )
for _ , pod := range pods {
apiPods = append ( apiPods , pod . ToAPIPod ( ) )
}
return apiPods , nil
}
2015-03-19 23:51:34 +00:00
func ( kl * Kubelet ) GetPodByFullName ( podFullName string ) ( * api . Pod , bool ) {
2015-03-21 00:22:02 +00:00
return kl . podManager . GetPodByFullName ( podFullName )
2015-03-19 23:51:34 +00:00
}
// GetPodByName provides the first pod that matches namespace and name, as well
// as whether the pod was found.
2015-03-13 13:19:07 +00:00
func ( kl * Kubelet ) GetPodByName ( namespace , name string ) ( * api . Pod , bool ) {
2015-03-21 00:22:02 +00:00
return kl . podManager . GetPodByName ( namespace , name )
2015-01-07 15:18:56 +00:00
}
2015-05-05 10:19:54 +00:00
func ( kl * Kubelet ) updateRuntimeUp ( ) {
2015-06-21 20:22:16 +00:00
start := time . Now ( )
2015-05-05 10:19:54 +00:00
err := waitUntilRuntimeIsUp ( kl . containerRuntime , 100 * time . Millisecond )
kl . runtimeMutex . Lock ( )
defer kl . runtimeMutex . Unlock ( )
if err == nil {
kl . lastTimestampRuntimeUp = time . Now ( )
2015-06-21 20:22:16 +00:00
} else {
glog . Errorf ( "Container runtime sanity check failed after %v, err: %v" , time . Since ( start ) , err )
2015-05-05 10:19:54 +00:00
}
}
2015-05-11 21:07:24 +00:00
func ( kl * Kubelet ) reconcileCBR0 ( podCIDR string ) error {
2015-05-08 18:47:33 +00:00
if podCIDR == "" {
glog . V ( 5 ) . Info ( "PodCIDR not set. Will not configure cbr0." )
return nil
}
2015-06-24 18:10:10 +00:00
glog . V ( 5 ) . Infof ( "PodCIDR is set to %q" , podCIDR )
2015-05-08 18:47:33 +00:00
_ , cidr , err := net . ParseCIDR ( podCIDR )
if err != nil {
return err
}
// Set cbr0 interface address to first address in IPNet
cidr . IP . To4 ( ) [ 3 ] += 1
if err := ensureCbr0 ( cidr ) ; err != nil {
return err
}
return nil
}
2015-02-23 21:04:45 +00:00
// updateNodeStatus updates node status to master with retries.
func ( kl * Kubelet ) updateNodeStatus ( ) error {
for i := 0 ; i < nodeStatusUpdateRetry ; i ++ {
2015-05-20 21:21:03 +00:00
if err := kl . tryUpdateNodeStatus ( ) ; err != nil {
glog . Errorf ( "Error updating node status, will retry: %v" , err )
2015-02-23 21:04:45 +00:00
} else {
return nil
}
}
2015-05-20 21:21:03 +00:00
return fmt . Errorf ( "update node status exceeds retry count" )
2015-02-23 21:04:45 +00:00
}
2015-04-22 14:48:38 +00:00
func ( kl * Kubelet ) recordNodeStatusEvent ( event string ) {
2015-06-12 15:40:34 +00:00
glog . V ( 2 ) . Infof ( "Recording %s event message for node %s" , event , kl . nodeName )
2015-03-30 13:21:01 +00:00
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
2015-06-12 15:40:34 +00:00
kl . recorder . Eventf ( kl . nodeRef , event , "Node %s status is now: %s" , kl . nodeName , event )
2015-04-09 01:22:44 +00:00
}
// Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
var oldNodeUnschedulable bool
2015-06-24 18:10:10 +00:00
func ( kl * Kubelet ) syncNetworkStatus ( ) {
kl . networkConfigMutex . Lock ( )
defer kl . networkConfigMutex . Unlock ( )
networkConfigured := true
if kl . configureCBR0 {
2015-06-24 19:56:36 +00:00
if err := ensureIPTablesMasqRule ( ) ; err != nil {
networkConfigured = false
glog . Errorf ( "Error on adding ip table rules: %v" , err )
}
2015-06-24 18:10:10 +00:00
if len ( kl . podCIDR ) == 0 {
2015-07-08 21:58:14 +00:00
glog . Warningf ( "ConfigureCBR0 requested, but PodCIDR not set. Will not configure CBR0 right now" )
2015-06-24 18:10:10 +00:00
networkConfigured = false
} else if err := kl . reconcileCBR0 ( kl . podCIDR ) ; err != nil {
networkConfigured = false
glog . Errorf ( "Error configuring cbr0: %v" , err )
}
}
kl . networkConfigured = networkConfigured
}
2015-05-20 21:21:03 +00:00
// setNodeStatus fills in the Status fields of the given Node, overwriting
// any fields that are currently set.
func ( kl * Kubelet ) setNodeStatus ( node * api . Node ) error {
// Set addresses for the node.
if kl . cloud != nil {
instances , ok := kl . cloud . Instances ( )
if ! ok {
return fmt . Errorf ( "failed to get instances from cloud provider" )
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
2015-06-12 15:42:38 +00:00
// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface
2015-06-12 15:40:34 +00:00
nodeAddresses , err := instances . NodeAddresses ( kl . nodeName )
2015-05-20 21:21:03 +00:00
if err != nil {
return fmt . Errorf ( "failed to get node address from cloud provider: %v" , err )
}
node . Status . Addresses = nodeAddresses
} else {
addr := net . ParseIP ( kl . hostname )
if addr != nil {
2015-08-07 01:08:56 +00:00
node . Status . Addresses = [ ] api . NodeAddress {
{ Type : api . NodeLegacyHostIP , Address : addr . String ( ) } ,
{ Type : api . NodeInternalIP , Address : addr . String ( ) } ,
}
2015-05-20 21:21:03 +00:00
} else {
addrs , err := net . LookupIP ( node . Name )
if err != nil {
return fmt . Errorf ( "can't get ip address of node %s: %v" , node . Name , err )
} else if len ( addrs ) == 0 {
return fmt . Errorf ( "no ip address for node %v" , node . Name )
} else {
2015-07-23 18:28:27 +00:00
// check all ip addresses for this node.Name and try to find the first non-loopback IPv4 address.
// If no match is found, it uses the IP of the interface with gateway on it.
for _ , ip := range addrs {
if ip . IsLoopback ( ) {
continue
}
if ip . To4 ( ) != nil {
2015-08-07 01:08:56 +00:00
node . Status . Addresses = [ ] api . NodeAddress {
{ Type : api . NodeLegacyHostIP , Address : ip . String ( ) } ,
{ Type : api . NodeInternalIP , Address : ip . String ( ) } ,
}
2015-07-23 18:28:27 +00:00
break
}
}
if len ( node . Status . Addresses ) == 0 {
ip , err := util . ChooseHostInterface ( )
if err != nil {
return err
}
2015-08-07 01:08:56 +00:00
node . Status . Addresses = [ ] api . NodeAddress {
{ Type : api . NodeLegacyHostIP , Address : ip . String ( ) } ,
{ Type : api . NodeInternalIP , Address : ip . String ( ) } ,
}
2015-07-23 18:28:27 +00:00
}
2015-05-20 21:21:03 +00:00
}
}
2015-02-23 21:04:45 +00:00
}
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
2015-03-16 12:50:00 +00:00
info , err := kl . GetCachedMachineInfo ( )
2015-02-23 21:04:45 +00:00
if err != nil {
2015-05-20 21:21:03 +00:00
// TODO(roberthbailey): This is required for test-cmd.sh to pass.
// See if the test should be updated instead.
node . Status . Capacity = api . ResourceList {
api . ResourceCPU : * resource . NewMilliQuantity ( 0 , resource . DecimalSI ) ,
api . ResourceMemory : resource . MustParse ( "0Gi" ) ,
2015-05-27 17:40:28 +00:00
api . ResourcePods : * resource . NewQuantity ( int64 ( kl . pods ) , resource . DecimalSI ) ,
2015-05-20 21:21:03 +00:00
}
glog . Errorf ( "Error getting machine info: %v" , err )
2015-02-23 21:04:45 +00:00
} else {
node . Status . NodeInfo . MachineID = info . MachineID
node . Status . NodeInfo . SystemUUID = info . SystemUUID
2015-03-25 13:44:40 +00:00
node . Status . Capacity = CapacityFromMachineInfo ( info )
2015-05-18 22:32:32 +00:00
node . Status . Capacity [ api . ResourcePods ] = * resource . NewQuantity (
int64 ( kl . pods ) , resource . DecimalSI )
2015-03-27 16:44:40 +00:00
if node . Status . NodeInfo . BootID != "" &&
node . Status . NodeInfo . BootID != info . BootID {
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
2015-08-11 07:25:10 +00:00
kl . recorder . Eventf ( kl . nodeRef , "Rebooted" ,
2015-06-12 15:40:34 +00:00
"Node %s has been rebooted, boot id: %s" , kl . nodeName , info . BootID )
2015-03-27 16:44:40 +00:00
}
node . Status . NodeInfo . BootID = info . BootID
2015-02-23 21:04:45 +00:00
}
2015-03-31 07:22:37 +00:00
verinfo , err := kl . cadvisor . VersionInfo ( )
if err != nil {
2015-05-20 21:21:03 +00:00
glog . Errorf ( "Error getting version info: %v" , err )
2015-03-31 07:22:37 +00:00
} else {
node . Status . NodeInfo . KernelVersion = verinfo . KernelVersion
node . Status . NodeInfo . OsImage = verinfo . ContainerOsVersion
// TODO: Determine the runtime is docker or rocket
node . Status . NodeInfo . ContainerRuntimeVersion = "docker://" + verinfo . DockerVersion
node . Status . NodeInfo . KubeletVersion = version . Get ( ) . String ( )
// TODO: kube-proxy might be different version from kubelet in the future
node . Status . NodeInfo . KubeProxyVersion = version . Get ( ) . String ( )
}
2015-05-05 10:19:54 +00:00
// Check whether container runtime can be reported as up.
2015-06-18 05:34:11 +00:00
containerRuntimeUp := kl . containerRuntimeUp ( )
2015-06-24 18:10:10 +00:00
// Check whether network is configured properly
networkConfigured := kl . doneNetworkConfigure ( )
2015-05-05 10:19:54 +00:00
2015-03-20 17:35:41 +00:00
currentTime := util . Now ( )
2015-05-20 11:09:18 +00:00
var newNodeReadyCondition api . NodeCondition
var oldNodeReadyConditionStatus api . ConditionStatus
2015-05-11 21:07:24 +00:00
if containerRuntimeUp && networkConfigured {
2015-05-20 11:09:18 +00:00
newNodeReadyCondition = api . NodeCondition {
2015-05-05 10:19:54 +00:00
Type : api . NodeReady ,
Status : api . ConditionTrue ,
2015-05-11 21:07:24 +00:00
Reason : "kubelet is posting ready status" ,
2015-05-05 10:19:54 +00:00
LastHeartbeatTime : currentTime ,
}
} else {
2015-05-11 21:07:24 +00:00
var reasons [ ] string
if ! containerRuntimeUp {
reasons = append ( reasons , "container runtime is down" )
}
if ! networkConfigured {
reasons = append ( reasons , "network not configured correctly" )
}
2015-05-20 11:09:18 +00:00
newNodeReadyCondition = api . NodeCondition {
2015-05-05 10:19:54 +00:00
Type : api . NodeReady ,
Status : api . ConditionFalse ,
2015-05-11 21:07:24 +00:00
Reason : strings . Join ( reasons , "," ) ,
2015-05-05 10:19:54 +00:00
LastHeartbeatTime : currentTime ,
}
2015-02-23 21:04:45 +00:00
}
2015-05-05 10:19:54 +00:00
2015-02-23 21:04:45 +00:00
updated := false
for i := range node . Status . Conditions {
if node . Status . Conditions [ i ] . Type == api . NodeReady {
2015-05-20 11:09:18 +00:00
oldNodeReadyConditionStatus = node . Status . Conditions [ i ] . Status
if oldNodeReadyConditionStatus == newNodeReadyCondition . Status {
newNodeReadyCondition . LastTransitionTime = node . Status . Conditions [ i ] . LastTransitionTime
} else {
newNodeReadyCondition . LastTransitionTime = currentTime
2015-03-30 13:21:01 +00:00
}
2015-05-20 11:09:18 +00:00
node . Status . Conditions [ i ] = newNodeReadyCondition
2015-02-23 21:04:45 +00:00
updated = true
}
}
if ! updated {
2015-05-20 11:09:18 +00:00
newNodeReadyCondition . LastTransitionTime = currentTime
node . Status . Conditions = append ( node . Status . Conditions , newNodeReadyCondition )
}
if ! updated || oldNodeReadyConditionStatus != newNodeReadyCondition . Status {
if newNodeReadyCondition . Status == api . ConditionTrue {
kl . recordNodeStatusEvent ( "NodeReady" )
} else {
kl . recordNodeStatusEvent ( "NodeNotReady" )
}
2015-02-23 21:04:45 +00:00
}
2015-04-09 01:22:44 +00:00
if oldNodeUnschedulable != node . Spec . Unschedulable {
if node . Spec . Unschedulable {
2015-04-22 14:48:38 +00:00
kl . recordNodeStatusEvent ( "NodeNotSchedulable" )
2015-04-09 01:22:44 +00:00
} else {
2015-04-22 14:48:38 +00:00
kl . recordNodeStatusEvent ( "NodeSchedulable" )
2015-04-09 01:22:44 +00:00
}
oldNodeUnschedulable = node . Spec . Unschedulable
}
2015-05-20 21:21:03 +00:00
return nil
}
2015-04-11 00:29:56 +00:00
2015-06-18 05:34:11 +00:00
func ( kl * Kubelet ) containerRuntimeUp ( ) bool {
kl . runtimeMutex . Lock ( )
defer kl . runtimeMutex . Unlock ( )
return kl . lastTimestampRuntimeUp . Add ( kl . runtimeUpThreshold ) . After ( time . Now ( ) )
}
2015-06-24 18:10:10 +00:00
func ( kl * Kubelet ) doneNetworkConfigure ( ) bool {
kl . networkConfigMutex . Lock ( )
defer kl . networkConfigMutex . Unlock ( )
return kl . networkConfigured
}
2015-05-20 21:21:03 +00:00
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly.
func ( kl * Kubelet ) tryUpdateNodeStatus ( ) error {
2015-06-12 15:40:34 +00:00
node , err := kl . kubeClient . Nodes ( ) . Get ( kl . nodeName )
2015-05-20 21:21:03 +00:00
if err != nil {
2015-06-12 15:40:34 +00:00
return fmt . Errorf ( "error getting node %q: %v" , kl . nodeName , err )
2015-05-20 21:21:03 +00:00
}
if node == nil {
2015-06-12 15:40:34 +00:00
return fmt . Errorf ( "no node instance returned for %q" , kl . nodeName )
2015-05-20 21:21:03 +00:00
}
2015-07-08 21:58:14 +00:00
kl . networkConfigMutex . Lock ( )
2015-06-24 18:10:10 +00:00
kl . podCIDR = node . Spec . PodCIDR
2015-07-08 21:58:14 +00:00
kl . networkConfigMutex . Unlock ( )
2015-06-24 18:10:10 +00:00
2015-05-20 21:21:03 +00:00
if err := kl . setNodeStatus ( node ) ; err != nil {
return err
}
2015-04-11 00:29:56 +00:00
// Update the current status on the API server
2015-04-08 09:32:47 +00:00
_ , err = kl . kubeClient . Nodes ( ) . UpdateStatus ( node )
2015-02-23 21:04:45 +00:00
return err
}
2015-06-12 11:11:53 +00:00
// GetPhase returns the phase of a pod given its container info.
// This func is exported to simplify integration with 3rd party kubelet
// integrations like kubernetes-mesos.
func GetPhase ( spec * api . PodSpec , info [ ] api . ContainerStatus ) api . PodPhase {
2015-01-28 17:56:35 +00:00
running := 0
waiting := 0
stopped := 0
failed := 0
succeeded := 0
unknown := 0
for _ , container := range spec . Containers {
2015-03-25 11:09:35 +00:00
if containerStatus , ok := api . GetContainerStatus ( info , container . Name ) ; ok {
2015-01-28 17:56:35 +00:00
if containerStatus . State . Running != nil {
running ++
2015-05-27 22:02:11 +00:00
} else if containerStatus . State . Terminated != nil {
2015-01-28 17:56:35 +00:00
stopped ++
2015-05-27 22:02:11 +00:00
if containerStatus . State . Terminated . ExitCode == 0 {
2015-01-28 17:56:35 +00:00
succeeded ++
} else {
failed ++
}
} else if containerStatus . State . Waiting != nil {
waiting ++
} else {
unknown ++
}
} else {
unknown ++
}
}
switch {
case waiting > 0 :
2015-02-09 21:55:36 +00:00
glog . V ( 5 ) . Infof ( "pod waiting > 0, pending" )
2015-01-28 17:56:35 +00:00
// One or more containers has not been started
return api . PodPending
case running > 0 && unknown == 0 :
// All containers have been started, and at least
// one container is running
return api . PodRunning
case running == 0 && stopped > 0 && unknown == 0 :
// All containers are terminated
2015-03-14 01:38:07 +00:00
if spec . RestartPolicy == api . RestartPolicyAlways {
2015-01-28 17:56:35 +00:00
// All containers are in the process of restarting
return api . PodRunning
}
if stopped == succeeded {
// RestartPolicy is not Always, and all
// containers are terminated in success
return api . PodSucceeded
}
2015-03-14 01:38:07 +00:00
if spec . RestartPolicy == api . RestartPolicyNever {
2015-01-28 17:56:35 +00:00
// RestartPolicy is Never, and all containers are
// terminated with at least one in failure
return api . PodFailed
}
// RestartPolicy is OnFailure, and at least one in failure
// and in the process of restarting
return api . PodRunning
default :
2015-02-09 21:55:36 +00:00
glog . V ( 5 ) . Infof ( "pod default case, pending" )
2015-01-28 17:56:35 +00:00
return api . PodPending
}
}
2015-02-02 18:51:52 +00:00
// getPodReadyCondition returns ready condition if all containers in a pod are ready, else it returns an unready condition.
2015-03-25 11:09:35 +00:00
func getPodReadyCondition ( spec * api . PodSpec , statuses [ ] api . ContainerStatus ) [ ] api . PodCondition {
2015-02-02 18:51:52 +00:00
ready := [ ] api . PodCondition { {
2015-02-24 05:21:14 +00:00
Type : api . PodReady ,
2015-03-23 18:33:55 +00:00
Status : api . ConditionTrue ,
2015-02-02 18:51:52 +00:00
} }
unready := [ ] api . PodCondition { {
2015-02-24 05:21:14 +00:00
Type : api . PodReady ,
2015-03-23 18:33:55 +00:00
Status : api . ConditionFalse ,
2015-02-02 18:51:52 +00:00
} }
2015-03-25 11:09:35 +00:00
if statuses == nil {
2015-02-02 18:51:52 +00:00
return unready
}
for _ , container := range spec . Containers {
2015-03-25 11:09:35 +00:00
if containerStatus , ok := api . GetContainerStatus ( statuses , container . Name ) ; ok {
2015-02-02 18:51:52 +00:00
if ! containerStatus . Ready {
return unready
}
} else {
return unready
}
}
return ready
}
2015-03-19 23:51:34 +00:00
// By passing the pod directly, this method avoids pod lookup, which requires
// grabbing a lock.
2015-04-08 18:53:31 +00:00
func ( kl * Kubelet ) generatePodStatus ( pod * api . Pod ) ( api . PodStatus , error ) {
2015-06-09 21:01:23 +00:00
start := time . Now ( )
defer func ( ) {
metrics . PodStatusLatency . Observe ( metrics . SinceInMicroseconds ( start ) )
} ( )
2015-03-23 17:14:30 +00:00
podFullName := kubecontainer . GetPodFullName ( pod )
2015-03-19 23:51:34 +00:00
glog . V ( 3 ) . Infof ( "Generating status for %q" , podFullName )
2015-01-14 02:11:24 +00:00
2015-05-16 00:01:56 +00:00
// TODO: Consider include the container information.
if kl . pastActiveDeadline ( pod ) {
2015-06-09 15:58:16 +00:00
reason := "DeadlineExceeded"
kl . recorder . Eventf ( pod , reason , "Pod was active on the node longer than specified deadline" )
2015-05-16 00:01:56 +00:00
return api . PodStatus {
Phase : api . PodFailed ,
2015-06-09 15:58:16 +00:00
Reason : reason ,
2015-05-16 00:01:56 +00:00
Message : "Pod was active on the node longer than specified deadline" } , nil
}
2015-05-09 05:01:43 +00:00
2015-05-16 00:01:56 +00:00
spec := & pod . Spec
2015-05-01 22:25:11 +00:00
podStatus , err := kl . containerRuntime . GetPodStatus ( pod )
2015-01-14 02:11:24 +00:00
2015-02-09 21:55:36 +00:00
if err != nil {
2015-02-09 21:55:36 +00:00
// Error handling
2015-05-11 17:41:52 +00:00
glog . Infof ( "Query container info for pod %q failed with error (%v)" , podFullName , err )
2015-02-09 21:55:36 +00:00
if strings . Contains ( err . Error ( ) , "resource temporarily unavailable" ) {
// Leave upstream layer to decide what to do
2015-03-18 16:44:50 +00:00
return api . PodStatus { } , err
2015-02-09 21:55:36 +00:00
}
2015-06-09 15:58:16 +00:00
pendingStatus := api . PodStatus {
Phase : api . PodPending ,
Reason : "GeneralError" ,
Message : fmt . Sprintf ( "Query container info failed with error (%v)" , err ) ,
}
return pendingStatus , nil
2015-02-09 21:55:36 +00:00
}
2015-02-09 21:55:36 +00:00
// Assume info is ready to process
2015-06-12 11:11:53 +00:00
podStatus . Phase = GetPhase ( spec , podStatus . ContainerStatuses )
2015-02-02 18:51:52 +00:00
for _ , c := range spec . Containers {
2015-03-25 11:09:35 +00:00
for i , st := range podStatus . ContainerStatuses {
if st . Name == c . Name {
2015-04-30 19:15:23 +00:00
ready := st . State . Running != nil && kl . readinessManager . GetReadiness ( kubecontainer . TrimRuntimePrefix ( st . ContainerID ) )
2015-03-27 01:45:23 +00:00
podStatus . ContainerStatuses [ i ] . Ready = ready
2015-03-25 11:09:35 +00:00
break
}
}
2015-02-02 18:51:52 +00:00
}
2015-05-09 05:01:43 +00:00
2015-03-25 11:09:35 +00:00
podStatus . Conditions = append ( podStatus . Conditions , getPodReadyCondition ( spec , podStatus . ContainerStatuses ) ... )
2015-04-08 18:53:31 +00:00
2015-06-12 17:20:26 +00:00
if ! kl . standaloneMode {
hostIP , err := kl . GetHostIP ( )
if err != nil {
glog . V ( 4 ) . Infof ( "Cannot get host IP: %v" , err )
} else {
podStatus . HostIP = hostIP . String ( )
2015-06-18 18:30:59 +00:00
if pod . Spec . HostNetwork && podStatus . PodIP == "" {
podStatus . PodIP = hostIP . String ( )
}
2015-06-12 17:20:26 +00:00
}
2015-03-24 12:35:38 +00:00
}
2015-01-28 17:56:35 +00:00
2015-03-18 16:44:50 +00:00
return * podStatus , nil
2014-07-15 17:26:56 +00:00
}
2014-07-15 07:04:30 +00:00
// Returns logs of current machine.
func ( kl * Kubelet ) ServeLogs ( w http . ResponseWriter , req * http . Request ) {
// TODO: whitelist logs we are willing to serve
2014-07-22 21:40:59 +00:00
kl . logServer . ServeHTTP ( w , req )
2014-07-15 07:04:30 +00:00
}
2014-08-07 18:15:11 +00:00
2015-04-06 23:58:34 +00:00
// findContainer finds and returns the container with the given pod ID, full name, and container name.
// It returns nil if not found.
func ( kl * Kubelet ) findContainer ( podFullName string , podUID types . UID , containerName string ) ( * kubecontainer . Container , error ) {
2015-05-01 22:25:11 +00:00
pods , err := kl . containerRuntime . GetPods ( false )
2015-04-06 23:58:34 +00:00
if err != nil {
return nil , err
}
pod := kubecontainer . Pods ( pods ) . FindPod ( podFullName , podUID )
return pod . FindContainerByName ( containerName ) , nil
}
2014-08-07 18:15:11 +00:00
// Run a command in a container, returns the combined stdout, stderr as an array of bytes
2015-04-06 23:58:34 +00:00
func ( kl * Kubelet ) RunInContainer ( podFullName string , podUID types . UID , containerName string , cmd [ ] string ) ( [ ] byte , error ) {
podUID = kl . podManager . TranslatePodUID ( podUID )
2015-03-20 20:55:26 +00:00
2015-04-06 23:58:34 +00:00
container , err := kl . findContainer ( podFullName , podUID , containerName )
2014-08-07 18:15:11 +00:00
if err != nil {
return nil , err
}
2015-04-06 23:58:34 +00:00
if container == nil {
return nil , fmt . Errorf ( "container not found (%q)" , containerName )
2014-08-07 18:15:11 +00:00
}
2015-04-06 23:58:34 +00:00
return kl . runner . RunInContainer ( string ( container . ID ) , cmd )
2014-08-07 18:15:11 +00:00
}
2014-11-10 21:13:57 +00:00
2015-01-08 20:41:38 +00:00
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
2015-04-06 23:58:34 +00:00
func ( kl * Kubelet ) ExecInContainer ( podFullName string , podUID types . UID , containerName string , cmd [ ] string , stdin io . Reader , stdout , stderr io . WriteCloser , tty bool ) error {
podUID = kl . podManager . TranslatePodUID ( podUID )
2015-03-20 20:55:26 +00:00
2015-04-06 23:58:34 +00:00
container , err := kl . findContainer ( podFullName , podUID , containerName )
2015-01-08 20:41:38 +00:00
if err != nil {
return err
}
2015-04-06 23:58:34 +00:00
if container == nil {
return fmt . Errorf ( "container not found (%q)" , containerName )
2015-01-08 20:41:38 +00:00
}
2015-04-06 23:58:34 +00:00
return kl . runner . ExecInContainer ( string ( container . ID ) , cmd , stdin , stdout , stderr , tty )
2015-01-08 20:41:38 +00:00
}
2015-07-28 04:48:55 +00:00
func ( kl * Kubelet ) AttachContainer ( podFullName string , podUID types . UID , containerName string , stdin io . Reader , stdout , stderr io . WriteCloser , tty bool ) error {
podUID = kl . podManager . TranslatePodUID ( podUID )
container , err := kl . findContainer ( podFullName , podUID , containerName )
if err != nil {
return err
}
if container == nil {
return fmt . Errorf ( "container not found (%q)" , containerName )
}
return kl . containerRuntime . AttachContainer ( string ( container . ID ) , stdin , stdout , stderr , tty )
}
2015-01-08 20:41:38 +00:00
// PortForward connects to the pod's port and copies data between the port
// and the stream.
2015-04-06 23:58:34 +00:00
func ( kl * Kubelet ) PortForward ( podFullName string , podUID types . UID , port uint16 , stream io . ReadWriteCloser ) error {
podUID = kl . podManager . TranslatePodUID ( podUID )
2015-03-20 20:55:26 +00:00
2015-05-01 22:25:11 +00:00
pods , err := kl . containerRuntime . GetPods ( false )
2015-01-08 20:41:38 +00:00
if err != nil {
return err
}
2015-04-06 23:58:34 +00:00
pod := kubecontainer . Pods ( pods ) . FindPod ( podFullName , podUID )
2015-06-05 21:10:45 +00:00
if pod . IsEmpty ( ) {
return fmt . Errorf ( "pod not found (%q)" , podFullName )
}
2015-04-06 23:58:34 +00:00
return kl . runner . PortForward ( & pod , port , stream )
2015-01-08 20:41:38 +00:00
}
2015-03-30 13:20:20 +00:00
// BirthCry sends an event that the kubelet has started up.
func ( kl * Kubelet ) BirthCry ( ) {
// Make an event that kubelet restarted.
2015-08-11 07:25:10 +00:00
kl . recorder . Eventf ( kl . nodeRef , "Starting" , "Starting kubelet." )
2014-11-10 21:13:57 +00:00
}
2015-01-08 20:41:38 +00:00
func ( kl * Kubelet ) StreamingConnectionIdleTimeout ( ) time . Duration {
return kl . streamingConnectionIdleTimeout
}
2015-03-06 07:56:30 +00:00
2015-06-17 22:31:46 +00:00
func ( kl * Kubelet ) ResyncInterval ( ) time . Duration {
return kl . resyncInterval
}
2015-03-06 07:56:30 +00:00
// GetContainerInfo returns stats (from Cadvisor) for a container.
2015-04-30 19:15:23 +00:00
func ( kl * Kubelet ) GetContainerInfo ( podFullName string , podUID types . UID , containerName string , req * cadvisorApi . ContainerInfoRequest ) ( * cadvisorApi . ContainerInfo , error ) {
2015-03-20 20:55:26 +00:00
2015-04-30 19:15:23 +00:00
podUID = kl . podManager . TranslatePodUID ( podUID )
2015-03-20 20:55:26 +00:00
2015-06-15 19:33:59 +00:00
pods , err := kl . runtimeCache . GetPods ( )
2015-03-06 07:56:30 +00:00
if err != nil {
return nil , err
}
2015-06-15 19:33:59 +00:00
pod := kubecontainer . Pods ( pods ) . FindPod ( podFullName , podUID )
container := pod . FindContainerByName ( containerName )
2015-04-30 19:15:23 +00:00
if container == nil {
2015-03-06 07:56:30 +00:00
return nil , ErrContainerNotFound
}
2015-04-30 19:15:23 +00:00
ci , err := kl . cadvisor . DockerContainer ( string ( container . ID ) , req )
2015-03-06 07:56:30 +00:00
if err != nil {
return nil , err
}
return & ci , nil
}
2015-04-23 17:14:08 +00:00
// Returns stats (from Cadvisor) for a non-Kubernetes container.
func ( kl * Kubelet ) GetRawContainerInfo ( containerName string , req * cadvisorApi . ContainerInfoRequest , subcontainers bool ) ( map [ string ] * cadvisorApi . ContainerInfo , error ) {
if subcontainers {
return kl . cadvisor . SubcontainerInfo ( containerName , req )
} else {
containerInfo , err := kl . cadvisor . ContainerInfo ( containerName , req )
if err != nil {
return nil , err
}
return map [ string ] * cadvisorApi . ContainerInfo {
containerInfo . Name : containerInfo ,
} , nil
}
2015-03-06 07:56:30 +00:00
}
2015-03-16 12:50:00 +00:00
// GetCachedMachineInfo assumes that the machine info can't change without a reboot
func ( kl * Kubelet ) GetCachedMachineInfo ( ) ( * cadvisorApi . MachineInfo , error ) {
if kl . machineInfo == nil {
info , err := kl . cadvisor . MachineInfo ( )
if err != nil {
return nil , err
}
kl . machineInfo = info
}
return kl . machineInfo , nil
2015-03-06 07:56:30 +00:00
}
2015-03-26 12:31:54 +00:00
func ( kl * Kubelet ) ListenAndServe ( address net . IP , port uint , tlsOptions * TLSOptions , enableDebuggingHandlers bool ) {
ListenAndServeKubeletServer ( kl , address , port , tlsOptions , enableDebuggingHandlers )
}
func ( kl * Kubelet ) ListenAndServeReadOnly ( address net . IP , port uint ) {
ListenAndServeKubeletReadOnlyServer ( kl , address , port )
}
2015-06-09 13:27:34 +00:00
// GetRuntime returns the current Runtime implementation in use by the kubelet. This func
// is exported to simplify integration with third party kubelet extensions (e.g. kubernetes-mesos).
func ( kl * Kubelet ) GetRuntime ( ) kubecontainer . Runtime {
return kl . containerRuntime
}