From b906e34576d73a227e751301d1850bc95758ef58 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Tue, 18 Aug 2015 17:52:26 -0700 Subject: [PATCH] kubelet: trigger pod workers independently Currently, whenever there is any update, kubelet would force all pod workers to sync again, causing resource contention and hence performance degradation. This commit flips kubelet to use incremental updates (as opposed to snapshots). This allows us to know what pods have changed and send updates to those pod workers only. The `SyncPods` function has been replaced with individual handlers, each handling an operation (ADD, REMOVE, UPDATE). Pod workers are still triggered periodically, and kubelet performs periodic cleanup as well. This commit also spawns a new goroutine solely responsible for killing pods. This is necessary because pod killing could hold up the sync loop for indefinitely long amount of time now user can define the graceful termination period in the container spec. --- cmd/kubelet/app/server.go | 2 +- contrib/mesos/pkg/executor/service/service.go | 6 +- pkg/kubelet/config/config.go | 2 +- pkg/kubelet/fake_pod_workers.go | 2 + pkg/kubelet/kubelet.go | 536 +++++++++--------- pkg/kubelet/kubelet_test.go | 78 +-- pkg/kubelet/pod_manager.go | 134 ++--- pkg/kubelet/pod_workers.go | 30 +- pkg/kubelet/runonce.go | 9 +- pkg/kubelet/runonce_test.go | 2 + 10 files changed, 402 insertions(+), 399 deletions(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 4f571739ba..fcdb015566 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -688,7 +688,7 @@ func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfi func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig { // source of all configuration - cfg := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates, kc.Recorder) + cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kc.Recorder) // define file config source if kc.ConfigFile != "" { diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 1735009129..b37209218d 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -504,8 +504,6 @@ func (kl *kubeletExecutor) Run(updates <-chan kubelet.PodUpdate) { util.Until(func() { kl.Kubelet.Run(pipe) }, 0, kl.executorDone) //TODO(jdef) revisit this if/when executor failover lands - err := kl.SyncPods([]*api.Pod{}, nil, nil, time.Now()) - if err != nil { - log.Errorf("failed to cleanly remove all pods and associated state: %v", err) - } + // Force kubelet to delete all pods. + kl.HandlePodDeletions(kl.GetPods()) } diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 93b48519ff..6b0736d3b5 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -249,7 +249,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de } case kubelet.SET: - glog.V(4).Infof("Setting pods for source %s : %v", source, update) + glog.V(4).Infof("Setting pods for source %s", source) s.markSourceSet(source) // Clear the old map entries by just creating a new map oldPods := pods diff --git a/pkg/kubelet/fake_pod_workers.go b/pkg/kubelet/fake_pod_workers.go index e25b5505f0..7f036e60fd 100644 --- a/pkg/kubelet/fake_pod_workers.go +++ b/pkg/kubelet/fake_pod_workers.go @@ -42,6 +42,8 @@ func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateCompl func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {} +func (f *fakePodWorkers) ForgetWorker(uid types.UID) {} + type TestingInterface interface { Errorf(format string, args ...interface{}) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ea6ad0fc61..ca5661d6a5 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -82,6 +82,11 @@ const ( // max backoff period maxContainerBackOff = 300 * time.Second + + // Capacity of the channel for storing pods to kill. A small number should + // suffice because a goroutine is dedicated to check the channel and does + // not block on anything else. + podKillingChannelCapacity = 50 ) var ( @@ -92,11 +97,11 @@ var ( // SyncHandler is an interface implemented by Kubelet, for testability type SyncHandler interface { - // Syncs current state to match the specified pods. SyncPodType specified what - // type of sync is occurring per pod. StartTime specifies the time at which - // syncing began (for use in monitoring). - SyncPods(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType, mirrorPods map[string]*api.Pod, - startTime time.Time) error + HandlePodAdditions(pods []*api.Pod) + HandlePodUpdates(pods []*api.Pod) + HandlePodDeletions(pods []*api.Pod) + HandlePodSyncs(pods []*api.Pod) + HandlePodCleanups() error } type SourcesReadyFn func() bool @@ -377,6 +382,8 @@ func NewMainKubelet( } klet.backOff = util.NewBackOff(resyncInterval, maxContainerBackOff) + klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity) + return klet, nil } @@ -532,6 +539,9 @@ type Kubelet struct { // Container restart Backoff backOff *util.Backoff + + // Channel for sending pods to kill. + podKillingCh chan *kubecontainer.Pod } // getRootDir returns the full path to the directory under which kubelet can @@ -745,6 +755,10 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop) + // Start a goroutine responsible for killing pods (that are not properly + // handled by pod workers). + go util.Until(kl.podKiller, 1*time.Second, util.NeverStop) + // Run the system oom watcher forever. kl.statusManager.Start() kl.syncLoop(updates, kl) @@ -1227,7 +1241,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont // // If we end up here with a create event for an already running pod, it could result in a // restart of its containers. This cannot happen unless the kubelet restarts, because the - // delete before the second create is processed by SyncPods, which cancels this pod worker. + // delete before the second create would cancel this pod worker. // // If the kubelet restarts, we have a bunch of running containers for which we get create // events. This is ok, because the pod status for these will include the podIp and terminated @@ -1421,58 +1435,38 @@ func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { return false } -//podIsTerminated returns true if status is in one of the terminated state. -func podIsTerminated(status *api.PodStatus) bool { +// Returns true if pod is in the terminated state ("Failed" or "Succeeded"). +func (kl *Kubelet) podIsTerminated(pod *api.Pod) bool { + var status api.PodStatus + // Check the cached pod status which was set after the last sync. + status, ok := kl.statusManager.GetPodStatus(pod.UID) + if !ok { + // If there is no cached status, use the status from the + // apiserver. This is useful if kubelet has recently been + // restarted. + status = pod.Status + } if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded { return true } + return false } -// Filter out pods in the terminated state ("Failed" or "Succeeded"). -func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod { - var pods []*api.Pod - for _, pod := range allPods { - var status api.PodStatus - // Check the cached pod status which was set after the last sync. - status, ok := kl.statusManager.GetPodStatus(pod.UID) - if !ok { - // If there is no cached status, use the status from the - // apiserver. This is useful if kubelet has recently been - // restarted. - status = pod.Status - } - if podIsTerminated(&status) { +func (kl *Kubelet) filterOutTerminatedPods(pods []*api.Pod) []*api.Pod { + var filteredPods []*api.Pod + for _, p := range pods { + if kl.podIsTerminated(p) { continue } - pods = append(pods, pod) + filteredPods = append(filteredPods, p) } - return pods -} - -// SyncPods synchronizes the configured list of pods (desired state) with the host current state. -func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncPodType, - mirrorPods map[string]*api.Pod, start time.Time) error { - defer func() { - metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start)) - }() - - kl.removeOrphanedPodStatuses(allPods, mirrorPods) - // Handles pod admission. - pods := kl.admitPods(allPods, podSyncTypes) - glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods)) - // Send updates to pod workers. - kl.dispatchWork(pods, podSyncTypes, mirrorPods, start) - // Clean up unwanted/orphaned resources. - if err := kl.cleanupPods(allPods, pods); err != nil { - return err - } - return nil + return filteredPods } // removeOrphanedPodStatuses removes obsolete entries in podStatus where // the pod is no longer considered bound to this node. -func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods map[string]*api.Pod) { +func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods []*api.Pod) { podUIDs := make(map[types.UID]bool) for _, pod := range pods { podUIDs[pod.UID] = true @@ -1483,53 +1477,79 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods map[str kl.statusManager.RemoveOrphanedStatuses(podUIDs) } -// dispatchWork dispatches pod updates to workers. -func (kl *Kubelet) dispatchWork(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType, - mirrorPods map[string]*api.Pod, start time.Time) { - // Check for any containers that need starting - for _, pod := range pods { - podFullName := kubecontainer.GetPodFullName(pod) - // Run the sync in an async manifest worker. - kl.podWorkers.UpdatePod(pod, mirrorPods[podFullName], func() { - metrics.PodWorkerLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start)) - }) - // Note the number of containers for new pods. - if val, ok := podSyncTypes[pod.UID]; ok && (val == SyncPodCreate) { - metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) - } - } -} - -// cleanupPods performs a series of cleanup work, including terminating pod -// workers, killing unwanted pods, and removing orphaned volumes/pod -// directories. -func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) error { - desiredPods := make(map[types.UID]empty) - for _, pod := range admittedPods { - desiredPods[pod.UID] = empty{} - } - // Stop the workers for no-longer existing pods. - kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods) - +func (kl *Kubelet) deletePod(uid types.UID) error { if !kl.sourcesReady() { // If the sources aren't ready, skip deletion, as we may accidentally delete pods // for sources that haven't reported yet. - glog.V(4).Infof("Skipping deletes, sources aren't ready yet.") + return fmt.Errorf("skipping delete because sources aren't ready yet") + } + kl.podWorkers.ForgetWorker(uid) + + // Runtime cache may not have been updated to with the pod, but it's okay + // because the periodic cleanup routine will attempt to delete again later. + runningPods, err := kl.runtimeCache.GetPods() + if err != nil { + return fmt.Errorf("error listing containers: %v", err) + } + pod := kubecontainer.Pods(runningPods).FindPod("", uid) + if pod.IsEmpty() { + return fmt.Errorf("pod not found") + } + + kl.podKillingCh <- &pod + // TODO: delete the mirror pod here? + + // We leave the volume/directory cleanup to the periodic cleanup routine. + return nil +} + +// HandlePodCleanups performs a series of cleanup work, including terminating +// pod workers, killing unwanted pods, and removing orphaned volumes/pod +// directories. +// TODO(yujuhong): This function is executed by the main sync loop, so it +// should not contain any blocking calls. Re-examine the function and decide +// whether or not we should move it into a separte goroutine. +func (kl *Kubelet) HandlePodCleanups() error { + if !kl.sourcesReady() { + // If the sources aren't ready, skip deletion, as we may accidentally delete pods + // for sources that haven't reported yet. + glog.V(4).Infof("Skipping cleanup, sources aren't ready yet.") return nil } + allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods() + // Pod phase progresses monotonically. Once a pod has reached a final state, + // it should never leave regardless of the restart policy. The statuses + // of such pods should not be changed, and there is no need to sync them. + // TODO: the logic here does not handle two cases: + // 1. If the containers were removed immediately after they died, kubelet + // may fail to generate correct statuses, let alone filtering correctly. + // 2. If kubelet restarted before writing the terminated status for a pod + // to the apiserver, it could still restart the terminated pod (even + // though the pod was not considered terminated by the apiserver). + // These two conditions could be alleviated by checkpointing kubelet. + activePods := kl.filterOutTerminatedPods(allPods) + + desiredPods := make(map[types.UID]empty) + for _, pod := range activePods { + desiredPods[pod.UID] = empty{} + } + // Stop the workers for no-longer existing pods. + // TODO: is here the best place to forget pod workers? + kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods) + runningPods, err := kl.runtimeCache.GetPods() if err != nil { glog.Errorf("Error listing containers: %#v", err) return err } - - // Kill containers associated with unwanted pods. - err = kl.killUnwantedPods(desiredPods, runningPods) - if err != nil { - glog.Errorf("Failed killing unwanted containers: %v", err) + for _, pod := range runningPods { + if _, found := desiredPods[pod.ID]; !found { + kl.podKillingCh <- pod + } } + kl.removeOrphanedPodStatuses(allPods, mirrorPods) // Note that we just killed the unwanted pods. This may not have reflected // in the cache. We need to bypass the cache to get the latest set of // running pods to clean up the volumes. @@ -1571,42 +1591,38 @@ func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) erro return err } -// killUnwantedPods kills the unwanted, running pods in parallel. -func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty, - runningPods []*kubecontainer.Pod) error { - ch := make(chan error, len(runningPods)) - defer close(ch) - numWorkers := 0 - for _, pod := range runningPods { - if _, found := desiredPods[pod.ID]; found { - // Per-pod workers will handle the desired pods. - continue - } - numWorkers++ - go func(pod *kubecontainer.Pod, ch chan error) { - var err error = nil - defer func() { - ch <- err - }() - glog.V(1).Infof("Killing unwanted pod %q", pod.Name) - // Stop the containers. - err = kl.killPod(nil, *pod) - if err != nil { - glog.Errorf("Failed killing the pod %q: %v", pod.Name, err) +// podKiller launches a goroutine to kill a pod received from the channel if +// another goroutine isn't already in action. +func (kl *Kubelet) podKiller() { + killing := util.NewStringSet() + resultCh := make(chan types.UID) + defer close(resultCh) + for { + select { + case pod, ok := <-kl.podKillingCh: + if !ok { return } - }(pod, ch) - } + if killing.Has(string(pod.ID)) { + // The pod is already being killed. + break + } + killing.Insert(string(pod.ID)) + go func(pod *kubecontainer.Pod, ch chan types.UID) { + defer func() { + ch <- pod.ID + }() + glog.V(2).Infof("Killing unwanted pod %q", pod.Name) + err := kl.killPod(nil, *pod) + if err != nil { + glog.Errorf("Failed killing the pod %q: %v", pod.Name, err) + } + }(pod, resultCh) - // Aggregate errors from the pod killing workers. - var errs []error - for i := 0; i < numWorkers; i++ { - err := <-ch - if err != nil { - errs = append(errs, err) + case podID := <-resultCh: + killing.Delete(string(podID)) } } - return utilErrors.NewAggregate(errs) } type podsByCreationTime []*api.Pod @@ -1624,44 +1640,34 @@ func (s podsByCreationTime) Less(i, j int) bool { } // checkHostPortConflicts detects pods with conflicted host ports. -func checkHostPortConflicts(pods []*api.Pod) (fitting []*api.Pod, notFitting []*api.Pod) { +func hasHostPortConflicts(pods []*api.Pod) bool { ports := util.StringSet{} - - // Respect the pod creation order when resolving conflicts. - sort.Sort(podsByCreationTime(pods)) - for _, pod := range pods { - if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports); len(errs) != 0 { + if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports); len(errs) > 0 { glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", kubecontainer.GetPodFullName(pod), errs) - notFitting = append(notFitting, pod) - continue + return true } - fitting = append(fitting, pod) } - return + return false } -// checkSufficientfFreeResources detects pods that exceeds node's resources. -func (kl *Kubelet) checkSufficientfFreeResources(pods []*api.Pod) (fitting []*api.Pod, notFittingCPU, notFittingMemory []*api.Pod) { +// hasInsufficientfFreeResources detects pods that exceeds node's resources. +// TODO: Consider integrate disk space into this function, and returns a +// suitable reason and message per resource type. +func (kl *Kubelet) hasInsufficientfFreeResources(pods []*api.Pod) (bool, bool) { info, err := kl.GetCachedMachineInfo() if err != nil { glog.Errorf("error getting machine info: %v", err) - return pods, nil, nil + // TODO: Should we admit the pod when machine info is unavailable? + return false, false } - - // Respect the pod creation order when resolving conflicts. - sort.Sort(podsByCreationTime(pods)) - capacity := CapacityFromMachineInfo(info) - return predicates.CheckPodsExceedingFreeResources(pods, capacity) + _, notFittingCPU, notFittingMemory := predicates.CheckPodsExceedingFreeResources(pods, capacity) + return len(notFittingCPU) > 0, len(notFittingMemory) > 0 } // handleOutOfDisk detects if pods can't fit due to lack of disk space. -func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType) []*api.Pod { - if len(podSyncTypes) == 0 { - // regular sync. no new pods - return pods - } +func (kl *Kubelet) isOutOfDisk() bool { outOfDockerDisk := false outOfRootDisk := false // Check disk space once globally and reject or accept all new pods. @@ -1681,120 +1687,53 @@ func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]S // Disk manager will only declare out of disk problems if unfreeze has been called. kl.diskSpaceManager.Unfreeze() - if !outOfDockerDisk && !outOfRootDisk { - // Disk space is fine. - return pods - } - - var fitting []*api.Pod - for i := range pods { - pod := pods[i] - // Only reject pods that didn't start yet. - if podSyncTypes[pod.UID] == SyncPodCreate { - reason := "OutOfDisk" - kl.recorder.Eventf(pod, reason, "Cannot start the pod due to lack of disk space.") - kl.statusManager.SetPodStatus(pod, api.PodStatus{ - Phase: api.PodFailed, - Reason: reason, - Message: "Pod cannot be started due to lack of disk space."}) - continue - } - fitting = append(fitting, pod) - } - return fitting + return outOfDockerDisk || outOfRootDisk } -// checkNodeSelectorMatching detects pods that do not match node's labels. -func (kl *Kubelet) checkNodeSelectorMatching(pods []*api.Pod) (fitting []*api.Pod, notFitting []*api.Pod) { +// matchesNodeSelector returns true if pod matches node's labels. +func (kl *Kubelet) matchesNodeSelector(pod *api.Pod) bool { if kl.standaloneMode { - return pods, notFitting + return true } node, err := kl.GetNode() if err != nil { glog.Errorf("error getting node: %v", err) - return pods, nil + return true } - for _, pod := range pods { - if !predicates.PodMatchesNodeLabels(pod, node) { - notFitting = append(notFitting, pod) - continue - } - fitting = append(fitting, pod) - } - return + return predicates.PodMatchesNodeLabels(pod, node) } -// handleNotfittingPods handles pods that do not fit on the node and returns -// the pods that fit. It currently checks host port conflicts, node selector -// mismatches, and exceeded node capacity. -func (kl *Kubelet) handleNotFittingPods(pods []*api.Pod) []*api.Pod { - fitting, notFitting := checkHostPortConflicts(pods) - for _, pod := range notFitting { - reason := "HostPortConflict" - kl.recorder.Eventf(pod, reason, "Cannot start the pod due to host port conflict.") - kl.statusManager.SetPodStatus(pod, api.PodStatus{ - Phase: api.PodFailed, - Reason: reason, - Message: "Pod cannot be started due to host port conflict"}) - } - fitting, notFitting = kl.checkNodeSelectorMatching(fitting) - for _, pod := range notFitting { - reason := "NodeSelectorMismatching" - kl.recorder.Eventf(pod, reason, "Cannot start the pod due to node selector mismatch.") - kl.statusManager.SetPodStatus(pod, api.PodStatus{ - Phase: api.PodFailed, - Reason: reason, - Message: "Pod cannot be started due to node selector mismatch"}) - } - fitting, notFittingCPU, notFittingMemory := kl.checkSufficientfFreeResources(fitting) - for _, pod := range notFittingCPU { - reason := "InsufficientFreeCPU" - kl.recorder.Eventf(pod, reason, "Cannot start the pod due to insufficient free CPU.") - kl.statusManager.SetPodStatus(pod, api.PodStatus{ - Phase: api.PodFailed, - Reason: reason, - Message: "Pod cannot be started due to insufficient free CPU"}) - } - for _, pod := range notFittingMemory { - reason := "InsufficientFreeMemory" - kl.recorder.Eventf(pod, reason, "Cannot start the pod due to insufficient free memory.") - kl.statusManager.SetPodStatus(pod, api.PodStatus{ - Phase: api.PodFailed, - Reason: reason, - Message: "Pod cannot be started due to insufficient free memory"}) - } - return fitting +func (kl *Kubelet) rejectPod(pod *api.Pod, reason, message string) { + kl.recorder.Eventf(pod, reason, message) + kl.statusManager.SetPodStatus(pod, api.PodStatus{ + Phase: api.PodFailed, + Reason: reason, + Message: "Pod " + message}) } -// admitPods handles pod admission. It filters out terminated pods, and pods -// that don't fit on the node, and may reject pods if node is overcommitted. -func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncPodType) []*api.Pod { - // Pod phase progresses monotonically. Once a pod has reached a final state, - // it should never leave regardless of the restart policy. The statuses - // of such pods should not be changed, and there is no need to sync them. - // TODO: the logic here does not handle two cases: - // 1. If the containers were removed immediately after they died, kubelet - // may fail to generate correct statuses, let alone filtering correctly. - // 2. If kubelet restarted before writing the terminated status for a pod - // to the apiserver, it could still restart the terminated pod (even - // though the pod was not considered terminated by the apiserver). - // These two conditions could be alleviated by checkpointing kubelet. - pods := kl.filterOutTerminatedPods(allPods) +// canAdmitPod determines if a pod can be admitted, and gives a reason if it +// cannot. "pod" is new pod, while "pods" include all admitted pods plus the +// new pod. The function returns a boolean value indicating whether the pod +// can be admitted, a brief single-word reason and a message explaining why +// the pod cannot be admitted. +func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, string) { + if hasHostPortConflicts(pods) { + return false, "HostPortConflict", "cannot start the pod due to host port conflict." + } + if !kl.matchesNodeSelector(pod) { + return false, "NodeSelectorMismatching", "cannot be started due to node selector mismatch" + } + cpu, memory := kl.hasInsufficientfFreeResources(pods) + if cpu { + return false, "InsufficientFreeCPU", "cannot start the pod due to insufficient free CPU." + } else if memory { + return false, "InsufficientFreeMemory", "cannot be started due to insufficient free memory" + } + if kl.isOutOfDisk() { + return false, "OutOfDisk", "cannot be started due to lack of disk space." + } - // Respect the pod creation order when resolving conflicts. - sort.Sort(podsByCreationTime(pods)) - - // Reject pods that we cannot run. - // handleNotFittingPods relies on static information (e.g. immutable fields - // in the pod specs or machine information that doesn't change without - // rebooting), and the pods are sorted by immutable creation time. Hence it - // should only rejects new pods without checking the pod sync types. - fitting := kl.handleNotFittingPods(pods) - - // Reject new creation requests if diskspace is running low. - admittedPods := kl.handleOutOfDisk(fitting, podSyncTypes) - - return admittedPods + return true, "", "" } // syncLoop is the main loop for processing changes. It watches for changes from @@ -1821,39 +1760,124 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandl glog.Infof("Skipping pod synchronization, network is not configured") return } - unsyncedPod := false - podSyncTypes := make(map[types.UID]SyncPodType) select { case u, ok := <-updates: if !ok { glog.Errorf("Update channel is closed. Exiting the sync loop.") return } - kl.podManager.UpdatePods(u, podSyncTypes) - unsyncedPod = true - kl.syncLoopMonitor.Store(time.Now()) + switch u.Op { + case ADD: + glog.V(2).Infof("SyncLoop (ADD): %q", kubeletUtil.FormatPodNames(u.Pods)) + handler.HandlePodAdditions(u.Pods) + case UPDATE: + glog.V(2).Infof("SyncLoop (UPDATE): %q", kubeletUtil.FormatPodNames(u.Pods)) + handler.HandlePodUpdates(u.Pods) + case REMOVE: + glog.V(2).Infof("SyncLoop (REMOVE): %q", kubeletUtil.FormatPodNames(u.Pods)) + handler.HandlePodDeletions(u.Pods) + case SET: + // TODO: Do we want to support this? + glog.Errorf("Kubelet does not support snapshot update") + } case <-time.After(kl.resyncInterval): - glog.V(4).Infof("Periodic sync") - } - start := time.Now() - // If we already caught some update, try to wait for some short time - // to possibly batch it with other incoming updates. - for unsyncedPod { - select { - case u := <-updates: - kl.podManager.UpdatePods(u, podSyncTypes) - kl.syncLoopMonitor.Store(time.Now()) - case <-time.After(5 * time.Millisecond): - // Break the for loop. - unsyncedPod = false + // Periodically syncs all the pods and performs cleanup tasks. + glog.V(4).Infof("SyncLoop (periodic sync)") + handler.HandlePodSyncs(kl.podManager.GetPods()) + if err := handler.HandlePodCleanups(); err != nil { + glog.Errorf("Failed cleaning pods: %v", err) } } - pods, mirrorPods := kl.podManager.GetPodsAndMirrorMap() + kl.syncLoopMonitor.Store(time.Now()) - if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil { - glog.Errorf("Couldn't sync containers: %v", err) +} + +func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType SyncPodType, mirrorPod *api.Pod, start time.Time) { + if kl.podIsTerminated(pod) { + return + } + // Run the sync in an async worker. + kl.podWorkers.UpdatePod(pod, mirrorPod, func() { + metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start)) + }) + // Note the number of containers for new pods. + if syncType == SyncPodCreate { + metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) + } +} + +// TODO: Consider handling all mirror pods updates in a separate component. +func (kl *Kubelet) handleMirrorPod(mirrorPod *api.Pod, start time.Time) { + // Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the + // corresponding static pod. Send update to the pod worker if the static + // pod exists. + if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok { + kl.dispatchWork(pod, SyncPodUpdate, mirrorPod, start) + } +} + +func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) { + start := time.Now() + sort.Sort(podsByCreationTime(pods)) + for _, pod := range pods { + kl.podManager.AddPod(pod) + if isMirrorPod(pod) { + kl.handleMirrorPod(pod, start) + continue + } + // Note that allPods includes the new pod since we added at the + // beginning of the loop. + allPods := kl.podManager.GetPods() + // We failed pods that we rejected, so activePods include all admitted + // pods that are alive and the new pod. + activePods := kl.filterOutTerminatedPods(allPods) + // Check if we can admit the pod; if not, reject it. + if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok { + kl.rejectPod(pod, reason, message) + continue + } + mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) + kl.dispatchWork(pod, SyncPodCreate, mirrorPod, start) + } +} + +func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) { + start := time.Now() + for _, pod := range pods { + kl.podManager.UpdatePod(pod) + if isMirrorPod(pod) { + kl.handleMirrorPod(pod, start) + continue + } + // TODO: Evaluate if we need to validate and reject updates. + + mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) + kl.dispatchWork(pod, SyncPodUpdate, mirrorPod, start) + } +} + +func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) { + start := time.Now() + for _, pod := range pods { + kl.podManager.DeletePod(pod) + if isMirrorPod(pod) { + kl.handleMirrorPod(pod, start) + continue + } + // Deletion is allowed to fail because the periodic cleanup routine + // will trigger deletion again. + if err := kl.deletePod(pod.UID); err != nil { + glog.V(2).Infof("Failed to delete pod %q, err: %v", kubeletUtil.FormatPodName(pod), err) + } + } +} + +func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) { + start := time.Now() + for _, pod := range pods { + mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) + kl.dispatchWork(pod, SyncPodSync, mirrorPod, start) } - kl.syncLoopMonitor.Store(time.Now()) } func (kl *Kubelet) LatestLoopEntryTime() time.Time { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index b6d5899fc2..d485c1a5e2 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -132,6 +132,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { fakeClock := &util.FakeClock{Time: time.Now()} kubelet.backOff = util.NewBackOff(time.Second, time.Minute) kubelet.backOff.Clock = fakeClock + kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20) return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient} } @@ -348,10 +349,7 @@ func TestSyncPodsStartPod(t *testing.T) { }, } kubelet.podManager.SetPods(pods) - err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + kubelet.HandlePodSyncs(pods) fakeRuntime.AssertStartedPods([]string{string(pods[0].UID)}) } @@ -375,16 +373,12 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { }, }, } - if err := kubelet.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()); err != nil { - t.Errorf("unexpected error: %v", err) - } + kubelet.HandlePodCleanups() // Sources are not ready yet. Don't remove any pods. fakeRuntime.AssertKilledPods([]string{}) ready = true - if err := kubelet.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()); err != nil { - t.Errorf("unexpected error: %v", err) - } + kubelet.HandlePodCleanups() // Sources are ready. Remove unwanted pods. fakeRuntime.AssertKilledPods([]string{"12345678"}) @@ -2004,18 +1998,17 @@ func TestGetHostPortConflicts(t *testing.T) { {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}}, } // Pods should not cause any conflict. - _, conflicts := checkHostPortConflicts(pods) - if len(conflicts) != 0 { - t.Errorf("expected no conflicts, Got %#v", conflicts) + if hasHostPortConflicts(pods) { + t.Errorf("expected no conflicts, Got conflicts") } - // The new pod should cause conflict and be reported. expected := &api.Pod{ Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}, } + // The new pod should cause conflict and be reported. pods = append(pods, expected) - if _, actual := checkHostPortConflicts(pods); !reflect.DeepEqual(actual, []*api.Pod{expected}) { - t.Errorf("expected %#v, Got %#v", expected, actual) + if !hasHostPortConflicts(pods) { + t.Errorf("expected no conflict, Got no conflicts") } } @@ -2052,7 +2045,7 @@ func TestHandlePortConflicts(t *testing.T) { // The newer pod should be rejected. conflictedPod := pods[0] - kl.handleNotFittingPods(pods) + kl.HandlePodAdditions(pods) // Check pod status stored in the status map. status, found := kl.statusManager.GetPodStatus(conflictedPod.UID) if !found { @@ -2094,7 +2087,7 @@ func TestHandleNodeSelector(t *testing.T) { // The first pod should be rejected. notfittingPod := pods[0] - kl.handleNotFittingPods(pods) + kl.HandlePodAdditions(pods) // Check pod status stored in the status map. status, found := kl.statusManager.GetPodStatus(notfittingPod.UID) if !found { @@ -2142,7 +2135,7 @@ func TestHandleMemExceeded(t *testing.T) { // The newer pod should be rejected. notfittingPod := pods[0] - kl.handleNotFittingPods(pods) + kl.HandlePodAdditions(pods) // Check pod status stored in the status map. status, found := kl.statusManager.GetPodStatus(notfittingPod.UID) if !found { @@ -2167,12 +2160,13 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) { } podToTest := pods[1] // Run once to populate the status map. - kl.handleNotFittingPods(pods) + kl.HandlePodAdditions(pods) if _, found := kl.statusManager.GetPodStatus(podToTest.UID); !found { t.Fatalf("expected to have status cached for pod2") } // Sync with empty pods so that the entry in status map will be removed. - kl.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) + kl.podManager.SetPods([]*api.Pod{}) + kl.HandlePodCleanups() if _, found := kl.statusManager.GetPodStatus(podToTest.UID); found { t.Fatalf("expected to not have status cached for pod2") } @@ -2695,12 +2689,8 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) { } kl.podManager.SetPods(orphanPods) - pods, mirrorMap := kl.podManager.GetPodsAndMirrorMap() // Sync with an empty pod list to delete all mirror pods. - err := kl.SyncPods(pods, emptyPodUIDs, mirrorMap, time.Now()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + kl.HandlePodCleanups() if manager.NumOfPods() != 0 { t.Errorf("expected zero mirror pods, got %v", manager.GetPods()) } @@ -2802,7 +2792,7 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) { { ObjectMeta: api.ObjectMeta{ UID: "12345678", - Name: "foo", + Name: "staticFoo", Namespace: "new", Annotations: map[string]string{ ConfigSourceAnnotationKey: "file", @@ -2815,11 +2805,9 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) { }, }, } + kubelet.podManager.SetPods(pods) - err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + kubelet.HandlePodSyncs(kubelet.podManager.GetPods()) status, ok := kubelet.statusManager.GetPodStatus(pods[0].UID) if ok { t.Errorf("unexpected status %#v found for static pod %q", status, pods[0].UID) @@ -3147,10 +3135,7 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { } // Let the pod worker sets the status to fail after this sync. - err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + kubelet.HandlePodUpdates(pods) status, found := kubelet.statusManager.GetPodStatus(pods[0].UID) if !found { t.Errorf("expected to found status for pod %q", pods[0].UID) @@ -3201,10 +3186,7 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) { } kubelet.podManager.SetPods(pods) - err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + kubelet.HandlePodUpdates(pods) status, found := kubelet.statusManager.GetPodStatus(pods[0].UID) if !found { t.Errorf("expected to found status for pod %q", pods[0].UID) @@ -3239,10 +3221,7 @@ func TestDeletePodDirsForDeletedPods(t *testing.T) { kl.podManager.SetPods(pods) // Sync to create pod directories. - err := kl.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + kl.HandlePodSyncs(kl.podManager.GetPods()) for i := range pods { if !dirExists(kl.getPodDir(pods[i].UID)) { t.Errorf("expected directory to exist for pod %d", i) @@ -3250,10 +3229,8 @@ func TestDeletePodDirsForDeletedPods(t *testing.T) { } // Pod 1 has been deleted and no longer exists. - err = kl.SyncPods([]*api.Pod{pods[0]}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + kl.podManager.SetPods([]*api.Pod{pods[0]}) + kl.HandlePodCleanups() if !dirExists(kl.getPodDir(pods[0].UID)) { t.Errorf("expected directory to exist for pod 0") } @@ -3294,10 +3271,7 @@ func TestDoesNotDeletePodDirsForTerminatedPods(t *testing.T) { kl.podManager.SetPods(pods) // Sync to create pod directories. - err := kl.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + kl.HandlePodSyncs(pods) for i := range pods { if !dirExists(kl.getPodDir(pods[i].UID)) { t.Errorf("expected directory to exist for pod %d", i) @@ -3307,7 +3281,7 @@ func TestDoesNotDeletePodDirsForTerminatedPods(t *testing.T) { // deleted. kl.statusManager.SetPodStatus(pods[1], api.PodStatus{Phase: api.PodFailed}) kl.statusManager.SetPodStatus(pods[2], api.PodStatus{Phase: api.PodSucceeded}) - err = kl.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) + kl.HandlePodCleanups() for i := range pods { if !dirExists(kl.getPodDir(pods[i].UID)) { t.Errorf("expected directory to exist for pod %d", i) diff --git a/pkg/kubelet/pod_manager.go b/pkg/kubelet/pod_manager.go index afe721d764..1ea358accd 100644 --- a/pkg/kubelet/pod_manager.go +++ b/pkg/kubelet/pod_manager.go @@ -19,7 +19,6 @@ package kubelet import ( "sync" - "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -45,9 +44,19 @@ type podManager interface { GetPods() []*api.Pod GetPodByFullName(podFullName string) (*api.Pod, bool) GetPodByName(namespace, name string) (*api.Pod, bool) - GetPodsAndMirrorMap() ([]*api.Pod, map[string]*api.Pod) + GetPodByMirrorPod(*api.Pod) (*api.Pod, bool) + GetMirrorPodByPod(*api.Pod) (*api.Pod, bool) + GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod) + + // SetPods replaces the internal pods with the new pods. + // It is currently only used for testing. SetPods(pods []*api.Pod) - UpdatePods(u PodUpdate, podSyncTypes map[types.UID]SyncPodType) + + // Methods that modify a single pod. + AddPod(pod *api.Pod) + UpdatePod(pod *api.Pod) + DeletePod(pod *api.Pod) + DeleteOrphanedMirrorPods() TranslatePodUID(uid types.UID) types.UID IsMirrorPodOf(mirrorPod, pod *api.Pod) bool @@ -103,50 +112,6 @@ func newBasicPodManager(apiserverClient client.Interface) *basicPodManager { return pm } -// Update the internal pods with those provided by the update. -func (pm *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]SyncPodType) { - pm.lock.Lock() - defer pm.lock.Unlock() - switch u.Op { - case SET: - glog.V(3).Infof("SET: Containers changed") - // Store the new pods. Don't worry about filtering host ports since those - // pods will never be looked up. - existingPods := make(map[types.UID]struct{}) - for uid := range pm.podByUID { - existingPods[uid] = struct{}{} - } - - // Update the internal pods. - pm.setPods(u.Pods) - - for uid := range pm.podByUID { - if _, ok := existingPods[uid]; !ok { - podSyncTypes[uid] = SyncPodCreate - } - } - case UPDATE: - glog.V(3).Infof("Update: Containers changed") - - // Store the updated pods. Don't worry about filtering host ports since those - // pods will never be looked up. - for i := range u.Pods { - podSyncTypes[u.Pods[i].UID] = SyncPodUpdate - } - allPods := applyUpdates(u.Pods, pm.getAllPods()) - pm.setPods(allPods) - default: - panic("syncLoop does not support incremental changes") - } - - // Mark all remaining pods as sync. - for uid := range pm.podByUID { - if _, ok := podSyncTypes[uid]; !ok { - podSyncTypes[uid] = SyncPodSync - } - } -} - // Set the internal pods based on the new pods. func (pm *basicPodManager) SetPods(newPods []*api.Pod) { pm.lock.Lock() @@ -177,24 +142,34 @@ func (pm *basicPodManager) setPods(newPods []*api.Pod) { pm.mirrorPodByFullName = mirrorPodByFullName } -func applyUpdates(changed []*api.Pod, current []*api.Pod) []*api.Pod { - updated := []*api.Pod{} - m := map[types.UID]*api.Pod{} - for _, pod := range changed { - m[pod.UID] = pod - } +func (pm *basicPodManager) AddPod(pod *api.Pod) { + pm.UpdatePod(pod) +} - for _, pod := range current { - if m[pod.UID] != nil { - updated = append(updated, m[pod.UID]) - glog.V(4).Infof("pod with UID: %q has a new spec %+v", pod.UID, *m[pod.UID]) - } else { - updated = append(updated, pod) - glog.V(4).Infof("pod with UID: %q stay with the same spec %+v", pod.UID, *pod) - } +func (pm *basicPodManager) UpdatePod(pod *api.Pod) { + pm.lock.Lock() + defer pm.lock.Unlock() + podFullName := kubecontainer.GetPodFullName(pod) + if isMirrorPod(pod) { + pm.mirrorPodByUID[pod.UID] = pod + pm.mirrorPodByFullName[podFullName] = pod + } else { + pm.podByUID[pod.UID] = pod + pm.podByFullName[podFullName] = pod } +} - return updated +func (pm *basicPodManager) DeletePod(pod *api.Pod) { + pm.lock.Lock() + defer pm.lock.Unlock() + podFullName := kubecontainer.GetPodFullName(pod) + if isMirrorPod(pod) { + delete(pm.mirrorPodByUID, pod.UID) + delete(pm.mirrorPodByFullName, podFullName) + } else { + delete(pm.podByUID, pod.UID) + delete(pm.podByFullName, podFullName) + } } // GetPods returns the regular pods bound to the kubelet and their spec. @@ -204,23 +179,20 @@ func (pm *basicPodManager) GetPods() []*api.Pod { return podsMapToPods(pm.podByUID) } +// GetPodsAndMirrorPods returns the both regular and mirror pods. +func (pm *basicPodManager) GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod) { + pm.lock.RLock() + defer pm.lock.RUnlock() + pods := podsMapToPods(pm.podByUID) + mirrorPods := podsMapToPods(pm.mirrorPodByUID) + return pods, mirrorPods +} + // Returns all pods (including mirror pods). func (pm *basicPodManager) getAllPods() []*api.Pod { return append(podsMapToPods(pm.podByUID), podsMapToPods(pm.mirrorPodByUID)...) } -// GetPodsAndMirrorMap returns the a copy of the regular pods and the mirror -// pods indexed by full name. -func (pm *basicPodManager) GetPodsAndMirrorMap() ([]*api.Pod, map[string]*api.Pod) { - pm.lock.RLock() - defer pm.lock.RUnlock() - mirrorPods := make(map[string]*api.Pod) - for key, pod := range pm.mirrorPodByFullName { - mirrorPods[key] = pod - } - return podsMapToPods(pm.podByUID), mirrorPods -} - // GetPodByName provides the (non-mirror) pod that matches namespace and name, // as well as whether the pod was found. func (pm *basicPodManager) GetPodByName(namespace, name string) (*api.Pod, bool) { @@ -295,3 +267,17 @@ func podsMapToPods(UIDMap map[types.UID]*api.Pod) []*api.Pod { } return pods } + +func (pm *basicPodManager) GetMirrorPodByPod(pod *api.Pod) (*api.Pod, bool) { + pm.lock.RLock() + defer pm.lock.RUnlock() + mirrorPod, ok := pm.mirrorPodByFullName[kubecontainer.GetPodFullName(pod)] + return mirrorPod, ok +} + +func (pm *basicPodManager) GetPodByMirrorPod(mirrorPod *api.Pod) (*api.Pod, bool) { + pm.lock.RLock() + defer pm.lock.RUnlock() + pod, ok := pm.podByFullName[kubecontainer.GetPodFullName(mirrorPod)] + return pod, ok +} diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 5cc96e0457..e508e32488 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -32,6 +32,7 @@ import ( type PodWorkers interface { UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) + ForgetWorker(uid types.UID) } type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod, SyncPodType) error @@ -171,19 +172,30 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete } } +func (p *podWorkers) removeWorker(uid types.UID) { + if ch, ok := p.podUpdates[uid]; ok { + close(ch) + delete(p.podUpdates, uid) + // If there is an undelivered work update for this pod we need to remove it + // since per-pod goroutine won't be able to put it to the already closed + // channel when it finish processing the current work update. + if _, cached := p.lastUndeliveredWorkUpdate[uid]; cached { + delete(p.lastUndeliveredWorkUpdate, uid) + } + } +} +func (p *podWorkers) ForgetWorker(uid types.UID) { + p.podLock.Lock() + defer p.podLock.Unlock() + p.removeWorker(uid) +} + func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) { p.podLock.Lock() defer p.podLock.Unlock() - for key, channel := range p.podUpdates { + for key := range p.podUpdates { if _, exists := desiredPods[key]; !exists { - close(channel) - delete(p.podUpdates, key) - // If there is an undelivered work update for this pod we need to remove it - // since per-pod goroutine won't be able to put it to the already closed - // channel when it finish processing the current work update. - if _, cached := p.lastUndeliveredWorkUpdate[key]; cached { - delete(p.lastUndeliveredWorkUpdate, key) - } + p.removeWorker(key) } } } diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index e73a636b36..e2c992803f 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -53,10 +53,15 @@ func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error) { // runOnce runs a given set of pods and returns their status. func (kl *Kubelet) runOnce(pods []*api.Pod, retryDelay time.Duration) (results []RunPodResult, err error) { - kl.handleNotFittingPods(pods) - ch := make(chan RunPodResult) + admitted := []*api.Pod{} for _, pod := range pods { + // Check if we can admit the pod. + if ok, reason, message := kl.canAdmitPod(append(admitted, pod), pod); !ok { + kl.rejectPod(pod, reason, message) + } else { + admitted = append(admitted, pod) + } go func(pod *api.Pod) { err := kl.runPod(pod, retryDelay) ch <- RunPodResult{pod, err} diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index b4b3ebec49..6f8d21bed1 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -76,6 +76,7 @@ func TestRunOnce(t *testing.T) { cadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) podManager, _ := newFakePodManager() + diskSpaceManager, _ := newDiskSpaceManager(cadvisor, DiskSpacePolicy{}) kb := &Kubelet{ rootDirectory: "/tmp/kubelet", @@ -88,6 +89,7 @@ func TestRunOnce(t *testing.T) { podManager: podManager, os: kubecontainer.FakeOS{}, volumeManager: newVolumeManager(), + diskSpaceManager: diskSpaceManager, } kb.containerManager, _ = newContainerManager(cadvisor, "", "", "")