kubelet: trigger pod workers independently

Currently, whenever there is any update, kubelet would force all pod workers to
sync again, causing resource contention and hence performance degradation.

This commit flips kubelet to use incremental updates (as opposed to snapshots).
This allows us to know what pods have changed and send updates to those pod
workers only. The `SyncPods` function has been replaced with individual
handlers, each handling an operation (ADD, REMOVE, UPDATE). Pod workers are
still triggered periodically, and kubelet performs periodic cleanup as well.

This commit also spawns a new goroutine solely responsible for killing pods.
This is necessary because pod killing could hold up the sync loop for
indefinitely long amount of time now user can define the graceful termination
period in the container spec.
pull/6/head
Yu-Ju Hong 2015-08-18 17:52:26 -07:00
parent 5dfc904c18
commit b906e34576
10 changed files with 402 additions and 399 deletions

View File

@ -688,7 +688,7 @@ func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfi
func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates, kc.Recorder)
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kc.Recorder)
// define file config source
if kc.ConfigFile != "" {

View File

@ -504,8 +504,6 @@ func (kl *kubeletExecutor) Run(updates <-chan kubelet.PodUpdate) {
util.Until(func() { kl.Kubelet.Run(pipe) }, 0, kl.executorDone)
//TODO(jdef) revisit this if/when executor failover lands
err := kl.SyncPods([]*api.Pod{}, nil, nil, time.Now())
if err != nil {
log.Errorf("failed to cleanly remove all pods and associated state: %v", err)
}
// Force kubelet to delete all pods.
kl.HandlePodDeletions(kl.GetPods())
}

View File

@ -249,7 +249,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
}
case kubelet.SET:
glog.V(4).Infof("Setting pods for source %s : %v", source, update)
glog.V(4).Infof("Setting pods for source %s", source)
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods

View File

@ -42,6 +42,8 @@ func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateCompl
func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {}
func (f *fakePodWorkers) ForgetWorker(uid types.UID) {}
type TestingInterface interface {
Errorf(format string, args ...interface{})
}

View File

@ -82,6 +82,11 @@ const (
// max backoff period
maxContainerBackOff = 300 * time.Second
// Capacity of the channel for storing pods to kill. A small number should
// suffice because a goroutine is dedicated to check the channel and does
// not block on anything else.
podKillingChannelCapacity = 50
)
var (
@ -92,11 +97,11 @@ var (
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occurring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
SyncPods(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType, mirrorPods map[string]*api.Pod,
startTime time.Time) error
HandlePodAdditions(pods []*api.Pod)
HandlePodUpdates(pods []*api.Pod)
HandlePodDeletions(pods []*api.Pod)
HandlePodSyncs(pods []*api.Pod)
HandlePodCleanups() error
}
type SourcesReadyFn func() bool
@ -377,6 +382,8 @@ func NewMainKubelet(
}
klet.backOff = util.NewBackOff(resyncInterval, maxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity)
return klet, nil
}
@ -532,6 +539,9 @@ type Kubelet struct {
// Container restart Backoff
backOff *util.Backoff
// Channel for sending pods to kill.
podKillingCh chan *kubecontainer.Pod
}
// getRootDir returns the full path to the directory under which kubelet can
@ -745,6 +755,10 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop)
// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go util.Until(kl.podKiller, 1*time.Second, util.NeverStop)
// Run the system oom watcher forever.
kl.statusManager.Start()
kl.syncLoop(updates, kl)
@ -1227,7 +1241,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
//
// If we end up here with a create event for an already running pod, it could result in a
// restart of its containers. This cannot happen unless the kubelet restarts, because the
// delete before the second create is processed by SyncPods, which cancels this pod worker.
// delete before the second create would cancel this pod worker.
//
// If the kubelet restarts, we have a bunch of running containers for which we get create
// events. This is ok, because the pod status for these will include the podIp and terminated
@ -1421,18 +1435,8 @@ func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
return false
}
//podIsTerminated returns true if status is in one of the terminated state.
func podIsTerminated(status *api.PodStatus) bool {
if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
return true
}
return false
}
// Filter out pods in the terminated state ("Failed" or "Succeeded").
func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod {
var pods []*api.Pod
for _, pod := range allPods {
// Returns true if pod is in the terminated state ("Failed" or "Succeeded").
func (kl *Kubelet) podIsTerminated(pod *api.Pod) bool {
var status api.PodStatus
// Check the cached pod status which was set after the last sync.
status, ok := kl.statusManager.GetPodStatus(pod.UID)
@ -1442,37 +1446,27 @@ func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod {
// restarted.
status = pod.Status
}
if podIsTerminated(&status) {
if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
return true
}
return false
}
func (kl *Kubelet) filterOutTerminatedPods(pods []*api.Pod) []*api.Pod {
var filteredPods []*api.Pod
for _, p := range pods {
if kl.podIsTerminated(p) {
continue
}
pods = append(pods, pod)
filteredPods = append(filteredPods, p)
}
return pods
}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncPodType,
mirrorPods map[string]*api.Pod, start time.Time) error {
defer func() {
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()
kl.removeOrphanedPodStatuses(allPods, mirrorPods)
// Handles pod admission.
pods := kl.admitPods(allPods, podSyncTypes)
glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods))
// Send updates to pod workers.
kl.dispatchWork(pods, podSyncTypes, mirrorPods, start)
// Clean up unwanted/orphaned resources.
if err := kl.cleanupPods(allPods, pods); err != nil {
return err
}
return nil
return filteredPods
}
// removeOrphanedPodStatuses removes obsolete entries in podStatus where
// the pod is no longer considered bound to this node.
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods map[string]*api.Pod) {
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods []*api.Pod) {
podUIDs := make(map[types.UID]bool)
for _, pod := range pods {
podUIDs[pod.UID] = true
@ -1483,53 +1477,79 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods map[str
kl.statusManager.RemoveOrphanedStatuses(podUIDs)
}
// dispatchWork dispatches pod updates to workers.
func (kl *Kubelet) dispatchWork(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType,
mirrorPods map[string]*api.Pod, start time.Time) {
// Check for any containers that need starting
for _, pod := range pods {
podFullName := kubecontainer.GetPodFullName(pod)
// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, mirrorPods[podFullName], func() {
metrics.PodWorkerLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})
// Note the number of containers for new pods.
if val, ok := podSyncTypes[pod.UID]; ok && (val == SyncPodCreate) {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
}
// cleanupPods performs a series of cleanup work, including terminating pod
// workers, killing unwanted pods, and removing orphaned volumes/pod
// directories.
func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) error {
desiredPods := make(map[types.UID]empty)
for _, pod := range admittedPods {
desiredPods[pod.UID] = empty{}
}
// Stop the workers for no-longer existing pods.
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
func (kl *Kubelet) deletePod(uid types.UID) error {
if !kl.sourcesReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
glog.V(4).Infof("Skipping deletes, sources aren't ready yet.")
return fmt.Errorf("skipping delete because sources aren't ready yet")
}
kl.podWorkers.ForgetWorker(uid)
// Runtime cache may not have been updated to with the pod, but it's okay
// because the periodic cleanup routine will attempt to delete again later.
runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
return fmt.Errorf("error listing containers: %v", err)
}
pod := kubecontainer.Pods(runningPods).FindPod("", uid)
if pod.IsEmpty() {
return fmt.Errorf("pod not found")
}
kl.podKillingCh <- &pod
// TODO: delete the mirror pod here?
// We leave the volume/directory cleanup to the periodic cleanup routine.
return nil
}
// HandlePodCleanups performs a series of cleanup work, including terminating
// pod workers, killing unwanted pods, and removing orphaned volumes/pod
// directories.
// TODO(yujuhong): This function is executed by the main sync loop, so it
// should not contain any blocking calls. Re-examine the function and decide
// whether or not we should move it into a separte goroutine.
func (kl *Kubelet) HandlePodCleanups() error {
if !kl.sourcesReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
glog.V(4).Infof("Skipping cleanup, sources aren't ready yet.")
return nil
}
allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods()
// Pod phase progresses monotonically. Once a pod has reached a final state,
// it should never leave regardless of the restart policy. The statuses
// of such pods should not be changed, and there is no need to sync them.
// TODO: the logic here does not handle two cases:
// 1. If the containers were removed immediately after they died, kubelet
// may fail to generate correct statuses, let alone filtering correctly.
// 2. If kubelet restarted before writing the terminated status for a pod
// to the apiserver, it could still restart the terminated pod (even
// though the pod was not considered terminated by the apiserver).
// These two conditions could be alleviated by checkpointing kubelet.
activePods := kl.filterOutTerminatedPods(allPods)
desiredPods := make(map[types.UID]empty)
for _, pod := range activePods {
desiredPods[pod.UID] = empty{}
}
// Stop the workers for no-longer existing pods.
// TODO: is here the best place to forget pod workers?
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
glog.Errorf("Error listing containers: %#v", err)
return err
}
// Kill containers associated with unwanted pods.
err = kl.killUnwantedPods(desiredPods, runningPods)
if err != nil {
glog.Errorf("Failed killing unwanted containers: %v", err)
for _, pod := range runningPods {
if _, found := desiredPods[pod.ID]; !found {
kl.podKillingCh <- pod
}
}
kl.removeOrphanedPodStatuses(allPods, mirrorPods)
// Note that we just killed the unwanted pods. This may not have reflected
// in the cache. We need to bypass the cache to get the latest set of
// running pods to clean up the volumes.
@ -1571,42 +1591,38 @@ func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) erro
return err
}
// killUnwantedPods kills the unwanted, running pods in parallel.
func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty,
runningPods []*kubecontainer.Pod) error {
ch := make(chan error, len(runningPods))
defer close(ch)
numWorkers := 0
for _, pod := range runningPods {
if _, found := desiredPods[pod.ID]; found {
// Per-pod workers will handle the desired pods.
continue
}
numWorkers++
go func(pod *kubecontainer.Pod, ch chan error) {
var err error = nil
defer func() {
ch <- err
}()
glog.V(1).Infof("Killing unwanted pod %q", pod.Name)
// Stop the containers.
err = kl.killPod(nil, *pod)
if err != nil {
glog.Errorf("Failed killing the pod %q: %v", pod.Name, err)
// podKiller launches a goroutine to kill a pod received from the channel if
// another goroutine isn't already in action.
func (kl *Kubelet) podKiller() {
killing := util.NewStringSet()
resultCh := make(chan types.UID)
defer close(resultCh)
for {
select {
case pod, ok := <-kl.podKillingCh:
if !ok {
return
}
}(pod, ch)
if killing.Has(string(pod.ID)) {
// The pod is already being killed.
break
}
// Aggregate errors from the pod killing workers.
var errs []error
for i := 0; i < numWorkers; i++ {
err := <-ch
killing.Insert(string(pod.ID))
go func(pod *kubecontainer.Pod, ch chan types.UID) {
defer func() {
ch <- pod.ID
}()
glog.V(2).Infof("Killing unwanted pod %q", pod.Name)
err := kl.killPod(nil, *pod)
if err != nil {
errs = append(errs, err)
glog.Errorf("Failed killing the pod %q: %v", pod.Name, err)
}
}(pod, resultCh)
case podID := <-resultCh:
killing.Delete(string(podID))
}
}
return utilErrors.NewAggregate(errs)
}
type podsByCreationTime []*api.Pod
@ -1624,44 +1640,34 @@ func (s podsByCreationTime) Less(i, j int) bool {
}
// checkHostPortConflicts detects pods with conflicted host ports.
func checkHostPortConflicts(pods []*api.Pod) (fitting []*api.Pod, notFitting []*api.Pod) {
func hasHostPortConflicts(pods []*api.Pod) bool {
ports := util.StringSet{}
// Respect the pod creation order when resolving conflicts.
sort.Sort(podsByCreationTime(pods))
for _, pod := range pods {
if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports); len(errs) != 0 {
if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports); len(errs) > 0 {
glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", kubecontainer.GetPodFullName(pod), errs)
notFitting = append(notFitting, pod)
continue
return true
}
fitting = append(fitting, pod)
}
return
return false
}
// checkSufficientfFreeResources detects pods that exceeds node's resources.
func (kl *Kubelet) checkSufficientfFreeResources(pods []*api.Pod) (fitting []*api.Pod, notFittingCPU, notFittingMemory []*api.Pod) {
// hasInsufficientfFreeResources detects pods that exceeds node's resources.
// TODO: Consider integrate disk space into this function, and returns a
// suitable reason and message per resource type.
func (kl *Kubelet) hasInsufficientfFreeResources(pods []*api.Pod) (bool, bool) {
info, err := kl.GetCachedMachineInfo()
if err != nil {
glog.Errorf("error getting machine info: %v", err)
return pods, nil, nil
// TODO: Should we admit the pod when machine info is unavailable?
return false, false
}
// Respect the pod creation order when resolving conflicts.
sort.Sort(podsByCreationTime(pods))
capacity := CapacityFromMachineInfo(info)
return predicates.CheckPodsExceedingFreeResources(pods, capacity)
_, notFittingCPU, notFittingMemory := predicates.CheckPodsExceedingFreeResources(pods, capacity)
return len(notFittingCPU) > 0, len(notFittingMemory) > 0
}
// handleOutOfDisk detects if pods can't fit due to lack of disk space.
func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType) []*api.Pod {
if len(podSyncTypes) == 0 {
// regular sync. no new pods
return pods
}
func (kl *Kubelet) isOutOfDisk() bool {
outOfDockerDisk := false
outOfRootDisk := false
// Check disk space once globally and reject or accept all new pods.
@ -1681,120 +1687,53 @@ func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]S
// Disk manager will only declare out of disk problems if unfreeze has been called.
kl.diskSpaceManager.Unfreeze()
if !outOfDockerDisk && !outOfRootDisk {
// Disk space is fine.
return pods
return outOfDockerDisk || outOfRootDisk
}
var fitting []*api.Pod
for i := range pods {
pod := pods[i]
// Only reject pods that didn't start yet.
if podSyncTypes[pod.UID] == SyncPodCreate {
reason := "OutOfDisk"
kl.recorder.Eventf(pod, reason, "Cannot start the pod due to lack of disk space.")
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod cannot be started due to lack of disk space."})
continue
}
fitting = append(fitting, pod)
}
return fitting
}
// checkNodeSelectorMatching detects pods that do not match node's labels.
func (kl *Kubelet) checkNodeSelectorMatching(pods []*api.Pod) (fitting []*api.Pod, notFitting []*api.Pod) {
// matchesNodeSelector returns true if pod matches node's labels.
func (kl *Kubelet) matchesNodeSelector(pod *api.Pod) bool {
if kl.standaloneMode {
return pods, notFitting
return true
}
node, err := kl.GetNode()
if err != nil {
glog.Errorf("error getting node: %v", err)
return pods, nil
return true
}
for _, pod := range pods {
if !predicates.PodMatchesNodeLabels(pod, node) {
notFitting = append(notFitting, pod)
continue
}
fitting = append(fitting, pod)
}
return
return predicates.PodMatchesNodeLabels(pod, node)
}
// handleNotfittingPods handles pods that do not fit on the node and returns
// the pods that fit. It currently checks host port conflicts, node selector
// mismatches, and exceeded node capacity.
func (kl *Kubelet) handleNotFittingPods(pods []*api.Pod) []*api.Pod {
fitting, notFitting := checkHostPortConflicts(pods)
for _, pod := range notFitting {
reason := "HostPortConflict"
kl.recorder.Eventf(pod, reason, "Cannot start the pod due to host port conflict.")
func (kl *Kubelet) rejectPod(pod *api.Pod, reason, message string) {
kl.recorder.Eventf(pod, reason, message)
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod cannot be started due to host port conflict"})
}
fitting, notFitting = kl.checkNodeSelectorMatching(fitting)
for _, pod := range notFitting {
reason := "NodeSelectorMismatching"
kl.recorder.Eventf(pod, reason, "Cannot start the pod due to node selector mismatch.")
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod cannot be started due to node selector mismatch"})
}
fitting, notFittingCPU, notFittingMemory := kl.checkSufficientfFreeResources(fitting)
for _, pod := range notFittingCPU {
reason := "InsufficientFreeCPU"
kl.recorder.Eventf(pod, reason, "Cannot start the pod due to insufficient free CPU.")
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod cannot be started due to insufficient free CPU"})
}
for _, pod := range notFittingMemory {
reason := "InsufficientFreeMemory"
kl.recorder.Eventf(pod, reason, "Cannot start the pod due to insufficient free memory.")
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod cannot be started due to insufficient free memory"})
}
return fitting
Message: "Pod " + message})
}
// admitPods handles pod admission. It filters out terminated pods, and pods
// that don't fit on the node, and may reject pods if node is overcommitted.
func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncPodType) []*api.Pod {
// Pod phase progresses monotonically. Once a pod has reached a final state,
// it should never leave regardless of the restart policy. The statuses
// of such pods should not be changed, and there is no need to sync them.
// TODO: the logic here does not handle two cases:
// 1. If the containers were removed immediately after they died, kubelet
// may fail to generate correct statuses, let alone filtering correctly.
// 2. If kubelet restarted before writing the terminated status for a pod
// to the apiserver, it could still restart the terminated pod (even
// though the pod was not considered terminated by the apiserver).
// These two conditions could be alleviated by checkpointing kubelet.
pods := kl.filterOutTerminatedPods(allPods)
// canAdmitPod determines if a pod can be admitted, and gives a reason if it
// cannot. "pod" is new pod, while "pods" include all admitted pods plus the
// new pod. The function returns a boolean value indicating whether the pod
// can be admitted, a brief single-word reason and a message explaining why
// the pod cannot be admitted.
func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, string) {
if hasHostPortConflicts(pods) {
return false, "HostPortConflict", "cannot start the pod due to host port conflict."
}
if !kl.matchesNodeSelector(pod) {
return false, "NodeSelectorMismatching", "cannot be started due to node selector mismatch"
}
cpu, memory := kl.hasInsufficientfFreeResources(pods)
if cpu {
return false, "InsufficientFreeCPU", "cannot start the pod due to insufficient free CPU."
} else if memory {
return false, "InsufficientFreeMemory", "cannot be started due to insufficient free memory"
}
if kl.isOutOfDisk() {
return false, "OutOfDisk", "cannot be started due to lack of disk space."
}
// Respect the pod creation order when resolving conflicts.
sort.Sort(podsByCreationTime(pods))
// Reject pods that we cannot run.
// handleNotFittingPods relies on static information (e.g. immutable fields
// in the pod specs or machine information that doesn't change without
// rebooting), and the pods are sorted by immutable creation time. Hence it
// should only rejects new pods without checking the pod sync types.
fitting := kl.handleNotFittingPods(pods)
// Reject new creation requests if diskspace is running low.
admittedPods := kl.handleOutOfDisk(fitting, podSyncTypes)
return admittedPods
return true, "", ""
}
// syncLoop is the main loop for processing changes. It watches for changes from
@ -1821,39 +1760,124 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandl
glog.Infof("Skipping pod synchronization, network is not configured")
return
}
unsyncedPod := false
podSyncTypes := make(map[types.UID]SyncPodType)
select {
case u, ok := <-updates:
if !ok {
glog.Errorf("Update channel is closed. Exiting the sync loop.")
return
}
kl.podManager.UpdatePods(u, podSyncTypes)
unsyncedPod = true
kl.syncLoopMonitor.Store(time.Now())
switch u.Op {
case ADD:
glog.V(2).Infof("SyncLoop (ADD): %q", kubeletUtil.FormatPodNames(u.Pods))
handler.HandlePodAdditions(u.Pods)
case UPDATE:
glog.V(2).Infof("SyncLoop (UPDATE): %q", kubeletUtil.FormatPodNames(u.Pods))
handler.HandlePodUpdates(u.Pods)
case REMOVE:
glog.V(2).Infof("SyncLoop (REMOVE): %q", kubeletUtil.FormatPodNames(u.Pods))
handler.HandlePodDeletions(u.Pods)
case SET:
// TODO: Do we want to support this?
glog.Errorf("Kubelet does not support snapshot update")
}
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
// Periodically syncs all the pods and performs cleanup tasks.
glog.V(4).Infof("SyncLoop (periodic sync)")
handler.HandlePodSyncs(kl.podManager.GetPods())
if err := handler.HandlePodCleanups(); err != nil {
glog.Errorf("Failed cleaning pods: %v", err)
}
}
kl.syncLoopMonitor.Store(time.Now())
}
func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType SyncPodType, mirrorPod *api.Pod, start time.Time) {
if kl.podIsTerminated(pod) {
return
}
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(pod, mirrorPod, func() {
metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
})
// Note the number of containers for new pods.
if syncType == SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
// TODO: Consider handling all mirror pods updates in a separate component.
func (kl *Kubelet) handleMirrorPod(mirrorPod *api.Pod, start time.Time) {
// Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the
// corresponding static pod. Send update to the pod worker if the static
// pod exists.
if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok {
kl.dispatchWork(pod, SyncPodUpdate, mirrorPod, start)
}
}
func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {
start := time.Now()
// If we already caught some update, try to wait for some short time
// to possibly batch it with other incoming updates.
for unsyncedPod {
select {
case u := <-updates:
kl.podManager.UpdatePods(u, podSyncTypes)
kl.syncLoopMonitor.Store(time.Now())
case <-time.After(5 * time.Millisecond):
// Break the for loop.
unsyncedPod = false
sort.Sort(podsByCreationTime(pods))
for _, pod := range pods {
kl.podManager.AddPod(pod)
if isMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
// Note that allPods includes the new pod since we added at the
// beginning of the loop.
allPods := kl.podManager.GetPods()
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive and the new pod.
activePods := kl.filterOutTerminatedPods(allPods)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, SyncPodCreate, mirrorPod, start)
}
}
pods, mirrorPods := kl.podManager.GetPodsAndMirrorMap()
kl.syncLoopMonitor.Store(time.Now())
if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil {
glog.Errorf("Couldn't sync containers: %v", err)
func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) {
start := time.Now()
for _, pod := range pods {
kl.podManager.UpdatePod(pod)
if isMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
// TODO: Evaluate if we need to validate and reject updates.
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, SyncPodUpdate, mirrorPod, start)
}
}
func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {
start := time.Now()
for _, pod := range pods {
kl.podManager.DeletePod(pod)
if isMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
// Deletion is allowed to fail because the periodic cleanup routine
// will trigger deletion again.
if err := kl.deletePod(pod.UID); err != nil {
glog.V(2).Infof("Failed to delete pod %q, err: %v", kubeletUtil.FormatPodName(pod), err)
}
}
}
func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) {
start := time.Now()
for _, pod := range pods {
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, SyncPodSync, mirrorPod, start)
}
kl.syncLoopMonitor.Store(time.Now())
}
func (kl *Kubelet) LatestLoopEntryTime() time.Time {

View File

@ -132,6 +132,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
fakeClock := &util.FakeClock{Time: time.Now()}
kubelet.backOff = util.NewBackOff(time.Second, time.Minute)
kubelet.backOff.Clock = fakeClock
kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20)
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient}
}
@ -348,10 +349,7 @@ func TestSyncPodsStartPod(t *testing.T) {
},
}
kubelet.podManager.SetPods(pods)
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.HandlePodSyncs(pods)
fakeRuntime.AssertStartedPods([]string{string(pods[0].UID)})
}
@ -375,16 +373,12 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
},
},
}
if err := kubelet.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.HandlePodCleanups()
// Sources are not ready yet. Don't remove any pods.
fakeRuntime.AssertKilledPods([]string{})
ready = true
if err := kubelet.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.HandlePodCleanups()
// Sources are ready. Remove unwanted pods.
fakeRuntime.AssertKilledPods([]string{"12345678"})
@ -2004,18 +1998,17 @@ func TestGetHostPortConflicts(t *testing.T) {
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}},
}
// Pods should not cause any conflict.
_, conflicts := checkHostPortConflicts(pods)
if len(conflicts) != 0 {
t.Errorf("expected no conflicts, Got %#v", conflicts)
if hasHostPortConflicts(pods) {
t.Errorf("expected no conflicts, Got conflicts")
}
// The new pod should cause conflict and be reported.
expected := &api.Pod{
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}},
}
// The new pod should cause conflict and be reported.
pods = append(pods, expected)
if _, actual := checkHostPortConflicts(pods); !reflect.DeepEqual(actual, []*api.Pod{expected}) {
t.Errorf("expected %#v, Got %#v", expected, actual)
if !hasHostPortConflicts(pods) {
t.Errorf("expected no conflict, Got no conflicts")
}
}
@ -2052,7 +2045,7 @@ func TestHandlePortConflicts(t *testing.T) {
// The newer pod should be rejected.
conflictedPod := pods[0]
kl.handleNotFittingPods(pods)
kl.HandlePodAdditions(pods)
// Check pod status stored in the status map.
status, found := kl.statusManager.GetPodStatus(conflictedPod.UID)
if !found {
@ -2094,7 +2087,7 @@ func TestHandleNodeSelector(t *testing.T) {
// The first pod should be rejected.
notfittingPod := pods[0]
kl.handleNotFittingPods(pods)
kl.HandlePodAdditions(pods)
// Check pod status stored in the status map.
status, found := kl.statusManager.GetPodStatus(notfittingPod.UID)
if !found {
@ -2142,7 +2135,7 @@ func TestHandleMemExceeded(t *testing.T) {
// The newer pod should be rejected.
notfittingPod := pods[0]
kl.handleNotFittingPods(pods)
kl.HandlePodAdditions(pods)
// Check pod status stored in the status map.
status, found := kl.statusManager.GetPodStatus(notfittingPod.UID)
if !found {
@ -2167,12 +2160,13 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
}
podToTest := pods[1]
// Run once to populate the status map.
kl.handleNotFittingPods(pods)
kl.HandlePodAdditions(pods)
if _, found := kl.statusManager.GetPodStatus(podToTest.UID); !found {
t.Fatalf("expected to have status cached for pod2")
}
// Sync with empty pods so that the entry in status map will be removed.
kl.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
kl.podManager.SetPods([]*api.Pod{})
kl.HandlePodCleanups()
if _, found := kl.statusManager.GetPodStatus(podToTest.UID); found {
t.Fatalf("expected to not have status cached for pod2")
}
@ -2695,12 +2689,8 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) {
}
kl.podManager.SetPods(orphanPods)
pods, mirrorMap := kl.podManager.GetPodsAndMirrorMap()
// Sync with an empty pod list to delete all mirror pods.
err := kl.SyncPods(pods, emptyPodUIDs, mirrorMap, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kl.HandlePodCleanups()
if manager.NumOfPods() != 0 {
t.Errorf("expected zero mirror pods, got %v", manager.GetPods())
}
@ -2802,7 +2792,7 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) {
{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: "foo",
Name: "staticFoo",
Namespace: "new",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "file",
@ -2815,11 +2805,9 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) {
},
},
}
kubelet.podManager.SetPods(pods)
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.HandlePodSyncs(kubelet.podManager.GetPods())
status, ok := kubelet.statusManager.GetPodStatus(pods[0].UID)
if ok {
t.Errorf("unexpected status %#v found for static pod %q", status, pods[0].UID)
@ -3147,10 +3135,7 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
}
// Let the pod worker sets the status to fail after this sync.
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.HandlePodUpdates(pods)
status, found := kubelet.statusManager.GetPodStatus(pods[0].UID)
if !found {
t.Errorf("expected to found status for pod %q", pods[0].UID)
@ -3201,10 +3186,7 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) {
}
kubelet.podManager.SetPods(pods)
err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.HandlePodUpdates(pods)
status, found := kubelet.statusManager.GetPodStatus(pods[0].UID)
if !found {
t.Errorf("expected to found status for pod %q", pods[0].UID)
@ -3239,10 +3221,7 @@ func TestDeletePodDirsForDeletedPods(t *testing.T) {
kl.podManager.SetPods(pods)
// Sync to create pod directories.
err := kl.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kl.HandlePodSyncs(kl.podManager.GetPods())
for i := range pods {
if !dirExists(kl.getPodDir(pods[i].UID)) {
t.Errorf("expected directory to exist for pod %d", i)
@ -3250,10 +3229,8 @@ func TestDeletePodDirsForDeletedPods(t *testing.T) {
}
// Pod 1 has been deleted and no longer exists.
err = kl.SyncPods([]*api.Pod{pods[0]}, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kl.podManager.SetPods([]*api.Pod{pods[0]})
kl.HandlePodCleanups()
if !dirExists(kl.getPodDir(pods[0].UID)) {
t.Errorf("expected directory to exist for pod 0")
}
@ -3294,10 +3271,7 @@ func TestDoesNotDeletePodDirsForTerminatedPods(t *testing.T) {
kl.podManager.SetPods(pods)
// Sync to create pod directories.
err := kl.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kl.HandlePodSyncs(pods)
for i := range pods {
if !dirExists(kl.getPodDir(pods[i].UID)) {
t.Errorf("expected directory to exist for pod %d", i)
@ -3307,7 +3281,7 @@ func TestDoesNotDeletePodDirsForTerminatedPods(t *testing.T) {
// deleted.
kl.statusManager.SetPodStatus(pods[1], api.PodStatus{Phase: api.PodFailed})
kl.statusManager.SetPodStatus(pods[2], api.PodStatus{Phase: api.PodSucceeded})
err = kl.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now())
kl.HandlePodCleanups()
for i := range pods {
if !dirExists(kl.getPodDir(pods[i].UID)) {
t.Errorf("expected directory to exist for pod %d", i)

View File

@ -19,7 +19,6 @@ package kubelet
import (
"sync"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -45,9 +44,19 @@ type podManager interface {
GetPods() []*api.Pod
GetPodByFullName(podFullName string) (*api.Pod, bool)
GetPodByName(namespace, name string) (*api.Pod, bool)
GetPodsAndMirrorMap() ([]*api.Pod, map[string]*api.Pod)
GetPodByMirrorPod(*api.Pod) (*api.Pod, bool)
GetMirrorPodByPod(*api.Pod) (*api.Pod, bool)
GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod)
// SetPods replaces the internal pods with the new pods.
// It is currently only used for testing.
SetPods(pods []*api.Pod)
UpdatePods(u PodUpdate, podSyncTypes map[types.UID]SyncPodType)
// Methods that modify a single pod.
AddPod(pod *api.Pod)
UpdatePod(pod *api.Pod)
DeletePod(pod *api.Pod)
DeleteOrphanedMirrorPods()
TranslatePodUID(uid types.UID) types.UID
IsMirrorPodOf(mirrorPod, pod *api.Pod) bool
@ -103,50 +112,6 @@ func newBasicPodManager(apiserverClient client.Interface) *basicPodManager {
return pm
}
// Update the internal pods with those provided by the update.
func (pm *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]SyncPodType) {
pm.lock.Lock()
defer pm.lock.Unlock()
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")
// Store the new pods. Don't worry about filtering host ports since those
// pods will never be looked up.
existingPods := make(map[types.UID]struct{})
for uid := range pm.podByUID {
existingPods[uid] = struct{}{}
}
// Update the internal pods.
pm.setPods(u.Pods)
for uid := range pm.podByUID {
if _, ok := existingPods[uid]; !ok {
podSyncTypes[uid] = SyncPodCreate
}
}
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
// Store the updated pods. Don't worry about filtering host ports since those
// pods will never be looked up.
for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = SyncPodUpdate
}
allPods := applyUpdates(u.Pods, pm.getAllPods())
pm.setPods(allPods)
default:
panic("syncLoop does not support incremental changes")
}
// Mark all remaining pods as sync.
for uid := range pm.podByUID {
if _, ok := podSyncTypes[uid]; !ok {
podSyncTypes[uid] = SyncPodSync
}
}
}
// Set the internal pods based on the new pods.
func (pm *basicPodManager) SetPods(newPods []*api.Pod) {
pm.lock.Lock()
@ -177,24 +142,34 @@ func (pm *basicPodManager) setPods(newPods []*api.Pod) {
pm.mirrorPodByFullName = mirrorPodByFullName
}
func applyUpdates(changed []*api.Pod, current []*api.Pod) []*api.Pod {
updated := []*api.Pod{}
m := map[types.UID]*api.Pod{}
for _, pod := range changed {
m[pod.UID] = pod
func (pm *basicPodManager) AddPod(pod *api.Pod) {
pm.UpdatePod(pod)
}
for _, pod := range current {
if m[pod.UID] != nil {
updated = append(updated, m[pod.UID])
glog.V(4).Infof("pod with UID: %q has a new spec %+v", pod.UID, *m[pod.UID])
func (pm *basicPodManager) UpdatePod(pod *api.Pod) {
pm.lock.Lock()
defer pm.lock.Unlock()
podFullName := kubecontainer.GetPodFullName(pod)
if isMirrorPod(pod) {
pm.mirrorPodByUID[pod.UID] = pod
pm.mirrorPodByFullName[podFullName] = pod
} else {
updated = append(updated, pod)
glog.V(4).Infof("pod with UID: %q stay with the same spec %+v", pod.UID, *pod)
pm.podByUID[pod.UID] = pod
pm.podByFullName[podFullName] = pod
}
}
return updated
func (pm *basicPodManager) DeletePod(pod *api.Pod) {
pm.lock.Lock()
defer pm.lock.Unlock()
podFullName := kubecontainer.GetPodFullName(pod)
if isMirrorPod(pod) {
delete(pm.mirrorPodByUID, pod.UID)
delete(pm.mirrorPodByFullName, podFullName)
} else {
delete(pm.podByUID, pod.UID)
delete(pm.podByFullName, podFullName)
}
}
// GetPods returns the regular pods bound to the kubelet and their spec.
@ -204,23 +179,20 @@ func (pm *basicPodManager) GetPods() []*api.Pod {
return podsMapToPods(pm.podByUID)
}
// GetPodsAndMirrorPods returns the both regular and mirror pods.
func (pm *basicPodManager) GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod) {
pm.lock.RLock()
defer pm.lock.RUnlock()
pods := podsMapToPods(pm.podByUID)
mirrorPods := podsMapToPods(pm.mirrorPodByUID)
return pods, mirrorPods
}
// Returns all pods (including mirror pods).
func (pm *basicPodManager) getAllPods() []*api.Pod {
return append(podsMapToPods(pm.podByUID), podsMapToPods(pm.mirrorPodByUID)...)
}
// GetPodsAndMirrorMap returns the a copy of the regular pods and the mirror
// pods indexed by full name.
func (pm *basicPodManager) GetPodsAndMirrorMap() ([]*api.Pod, map[string]*api.Pod) {
pm.lock.RLock()
defer pm.lock.RUnlock()
mirrorPods := make(map[string]*api.Pod)
for key, pod := range pm.mirrorPodByFullName {
mirrorPods[key] = pod
}
return podsMapToPods(pm.podByUID), mirrorPods
}
// GetPodByName provides the (non-mirror) pod that matches namespace and name,
// as well as whether the pod was found.
func (pm *basicPodManager) GetPodByName(namespace, name string) (*api.Pod, bool) {
@ -295,3 +267,17 @@ func podsMapToPods(UIDMap map[types.UID]*api.Pod) []*api.Pod {
}
return pods
}
func (pm *basicPodManager) GetMirrorPodByPod(pod *api.Pod) (*api.Pod, bool) {
pm.lock.RLock()
defer pm.lock.RUnlock()
mirrorPod, ok := pm.mirrorPodByFullName[kubecontainer.GetPodFullName(pod)]
return mirrorPod, ok
}
func (pm *basicPodManager) GetPodByMirrorPod(mirrorPod *api.Pod) (*api.Pod, bool) {
pm.lock.RLock()
defer pm.lock.RUnlock()
pod, ok := pm.podByFullName[kubecontainer.GetPodFullName(mirrorPod)]
return pod, ok
}

View File

@ -32,6 +32,7 @@ import (
type PodWorkers interface {
UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func())
ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
ForgetWorker(uid types.UID)
}
type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod, SyncPodType) error
@ -171,21 +172,32 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete
}
}
func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {
p.podLock.Lock()
defer p.podLock.Unlock()
for key, channel := range p.podUpdates {
if _, exists := desiredPods[key]; !exists {
close(channel)
delete(p.podUpdates, key)
func (p *podWorkers) removeWorker(uid types.UID) {
if ch, ok := p.podUpdates[uid]; ok {
close(ch)
delete(p.podUpdates, uid)
// If there is an undelivered work update for this pod we need to remove it
// since per-pod goroutine won't be able to put it to the already closed
// channel when it finish processing the current work update.
if _, cached := p.lastUndeliveredWorkUpdate[key]; cached {
delete(p.lastUndeliveredWorkUpdate, key)
if _, cached := p.lastUndeliveredWorkUpdate[uid]; cached {
delete(p.lastUndeliveredWorkUpdate, uid)
}
}
}
func (p *podWorkers) ForgetWorker(uid types.UID) {
p.podLock.Lock()
defer p.podLock.Unlock()
p.removeWorker(uid)
}
func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {
p.podLock.Lock()
defer p.podLock.Unlock()
for key := range p.podUpdates {
if _, exists := desiredPods[key]; !exists {
p.removeWorker(key)
}
}
}
func (p *podWorkers) checkForUpdates(uid types.UID, updateComplete func()) {

View File

@ -53,10 +53,15 @@ func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error) {
// runOnce runs a given set of pods and returns their status.
func (kl *Kubelet) runOnce(pods []*api.Pod, retryDelay time.Duration) (results []RunPodResult, err error) {
kl.handleNotFittingPods(pods)
ch := make(chan RunPodResult)
admitted := []*api.Pod{}
for _, pod := range pods {
// Check if we can admit the pod.
if ok, reason, message := kl.canAdmitPod(append(admitted, pod), pod); !ok {
kl.rejectPod(pod, reason, message)
} else {
admitted = append(admitted, pod)
}
go func(pod *api.Pod) {
err := kl.runPod(pod, retryDelay)
ch <- RunPodResult{pod, err}

View File

@ -76,6 +76,7 @@ func TestRunOnce(t *testing.T) {
cadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
podManager, _ := newFakePodManager()
diskSpaceManager, _ := newDiskSpaceManager(cadvisor, DiskSpacePolicy{})
kb := &Kubelet{
rootDirectory: "/tmp/kubelet",
@ -88,6 +89,7 @@ func TestRunOnce(t *testing.T) {
podManager: podManager,
os: kubecontainer.FakeOS{},
volumeManager: newVolumeManager(),
diskSpaceManager: diskSpaceManager,
}
kb.containerManager, _ = newContainerManager(cadvisor, "", "", "")