diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 4f571739ba..fcdb015566 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -688,7 +688,7 @@ func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfi func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig { // source of all configuration - cfg := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates, kc.Recorder) + cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kc.Recorder) // define file config source if kc.ConfigFile != "" { diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index 1735009129..b37209218d 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -504,8 +504,6 @@ func (kl *kubeletExecutor) Run(updates <-chan kubelet.PodUpdate) { util.Until(func() { kl.Kubelet.Run(pipe) }, 0, kl.executorDone) //TODO(jdef) revisit this if/when executor failover lands - err := kl.SyncPods([]*api.Pod{}, nil, nil, time.Now()) - if err != nil { - log.Errorf("failed to cleanly remove all pods and associated state: %v", err) - } + // Force kubelet to delete all pods. + kl.HandlePodDeletions(kl.GetPods()) } diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 93b48519ff..6b0736d3b5 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -249,7 +249,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de } case kubelet.SET: - glog.V(4).Infof("Setting pods for source %s : %v", source, update) + glog.V(4).Infof("Setting pods for source %s", source) s.markSourceSet(source) // Clear the old map entries by just creating a new map oldPods := pods diff --git a/pkg/kubelet/fake_pod_workers.go b/pkg/kubelet/fake_pod_workers.go index e25b5505f0..7f036e60fd 100644 --- a/pkg/kubelet/fake_pod_workers.go +++ b/pkg/kubelet/fake_pod_workers.go @@ -42,6 +42,8 @@ func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateCompl func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {} +func (f *fakePodWorkers) ForgetWorker(uid types.UID) {} + type TestingInterface interface { Errorf(format string, args ...interface{}) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ea6ad0fc61..ca5661d6a5 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -82,6 +82,11 @@ const ( // max backoff period maxContainerBackOff = 300 * time.Second + + // Capacity of the channel for storing pods to kill. A small number should + // suffice because a goroutine is dedicated to check the channel and does + // not block on anything else. + podKillingChannelCapacity = 50 ) var ( @@ -92,11 +97,11 @@ var ( // SyncHandler is an interface implemented by Kubelet, for testability type SyncHandler interface { - // Syncs current state to match the specified pods. SyncPodType specified what - // type of sync is occurring per pod. StartTime specifies the time at which - // syncing began (for use in monitoring). - SyncPods(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType, mirrorPods map[string]*api.Pod, - startTime time.Time) error + HandlePodAdditions(pods []*api.Pod) + HandlePodUpdates(pods []*api.Pod) + HandlePodDeletions(pods []*api.Pod) + HandlePodSyncs(pods []*api.Pod) + HandlePodCleanups() error } type SourcesReadyFn func() bool @@ -377,6 +382,8 @@ func NewMainKubelet( } klet.backOff = util.NewBackOff(resyncInterval, maxContainerBackOff) + klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity) + return klet, nil } @@ -532,6 +539,9 @@ type Kubelet struct { // Container restart Backoff backOff *util.Backoff + + // Channel for sending pods to kill. + podKillingCh chan *kubecontainer.Pod } // getRootDir returns the full path to the directory under which kubelet can @@ -745,6 +755,10 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop) + // Start a goroutine responsible for killing pods (that are not properly + // handled by pod workers). + go util.Until(kl.podKiller, 1*time.Second, util.NeverStop) + // Run the system oom watcher forever. kl.statusManager.Start() kl.syncLoop(updates, kl) @@ -1227,7 +1241,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont // // If we end up here with a create event for an already running pod, it could result in a // restart of its containers. This cannot happen unless the kubelet restarts, because the - // delete before the second create is processed by SyncPods, which cancels this pod worker. + // delete before the second create would cancel this pod worker. // // If the kubelet restarts, we have a bunch of running containers for which we get create // events. This is ok, because the pod status for these will include the podIp and terminated @@ -1421,58 +1435,38 @@ func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { return false } -//podIsTerminated returns true if status is in one of the terminated state. -func podIsTerminated(status *api.PodStatus) bool { +// Returns true if pod is in the terminated state ("Failed" or "Succeeded"). +func (kl *Kubelet) podIsTerminated(pod *api.Pod) bool { + var status api.PodStatus + // Check the cached pod status which was set after the last sync. + status, ok := kl.statusManager.GetPodStatus(pod.UID) + if !ok { + // If there is no cached status, use the status from the + // apiserver. This is useful if kubelet has recently been + // restarted. + status = pod.Status + } if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded { return true } + return false } -// Filter out pods in the terminated state ("Failed" or "Succeeded"). -func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod { - var pods []*api.Pod - for _, pod := range allPods { - var status api.PodStatus - // Check the cached pod status which was set after the last sync. - status, ok := kl.statusManager.GetPodStatus(pod.UID) - if !ok { - // If there is no cached status, use the status from the - // apiserver. This is useful if kubelet has recently been - // restarted. - status = pod.Status - } - if podIsTerminated(&status) { +func (kl *Kubelet) filterOutTerminatedPods(pods []*api.Pod) []*api.Pod { + var filteredPods []*api.Pod + for _, p := range pods { + if kl.podIsTerminated(p) { continue } - pods = append(pods, pod) + filteredPods = append(filteredPods, p) } - return pods -} - -// SyncPods synchronizes the configured list of pods (desired state) with the host current state. -func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncPodType, - mirrorPods map[string]*api.Pod, start time.Time) error { - defer func() { - metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start)) - }() - - kl.removeOrphanedPodStatuses(allPods, mirrorPods) - // Handles pod admission. - pods := kl.admitPods(allPods, podSyncTypes) - glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods)) - // Send updates to pod workers. - kl.dispatchWork(pods, podSyncTypes, mirrorPods, start) - // Clean up unwanted/orphaned resources. - if err := kl.cleanupPods(allPods, pods); err != nil { - return err - } - return nil + return filteredPods } // removeOrphanedPodStatuses removes obsolete entries in podStatus where // the pod is no longer considered bound to this node. -func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods map[string]*api.Pod) { +func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods []*api.Pod) { podUIDs := make(map[types.UID]bool) for _, pod := range pods { podUIDs[pod.UID] = true @@ -1483,53 +1477,79 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods map[str kl.statusManager.RemoveOrphanedStatuses(podUIDs) } -// dispatchWork dispatches pod updates to workers. -func (kl *Kubelet) dispatchWork(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType, - mirrorPods map[string]*api.Pod, start time.Time) { - // Check for any containers that need starting - for _, pod := range pods { - podFullName := kubecontainer.GetPodFullName(pod) - // Run the sync in an async manifest worker. - kl.podWorkers.UpdatePod(pod, mirrorPods[podFullName], func() { - metrics.PodWorkerLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start)) - }) - // Note the number of containers for new pods. - if val, ok := podSyncTypes[pod.UID]; ok && (val == SyncPodCreate) { - metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) - } - } -} - -// cleanupPods performs a series of cleanup work, including terminating pod -// workers, killing unwanted pods, and removing orphaned volumes/pod -// directories. -func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) error { - desiredPods := make(map[types.UID]empty) - for _, pod := range admittedPods { - desiredPods[pod.UID] = empty{} - } - // Stop the workers for no-longer existing pods. - kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods) - +func (kl *Kubelet) deletePod(uid types.UID) error { if !kl.sourcesReady() { // If the sources aren't ready, skip deletion, as we may accidentally delete pods // for sources that haven't reported yet. - glog.V(4).Infof("Skipping deletes, sources aren't ready yet.") + return fmt.Errorf("skipping delete because sources aren't ready yet") + } + kl.podWorkers.ForgetWorker(uid) + + // Runtime cache may not have been updated to with the pod, but it's okay + // because the periodic cleanup routine will attempt to delete again later. + runningPods, err := kl.runtimeCache.GetPods() + if err != nil { + return fmt.Errorf("error listing containers: %v", err) + } + pod := kubecontainer.Pods(runningPods).FindPod("", uid) + if pod.IsEmpty() { + return fmt.Errorf("pod not found") + } + + kl.podKillingCh <- &pod + // TODO: delete the mirror pod here? + + // We leave the volume/directory cleanup to the periodic cleanup routine. + return nil +} + +// HandlePodCleanups performs a series of cleanup work, including terminating +// pod workers, killing unwanted pods, and removing orphaned volumes/pod +// directories. +// TODO(yujuhong): This function is executed by the main sync loop, so it +// should not contain any blocking calls. Re-examine the function and decide +// whether or not we should move it into a separte goroutine. +func (kl *Kubelet) HandlePodCleanups() error { + if !kl.sourcesReady() { + // If the sources aren't ready, skip deletion, as we may accidentally delete pods + // for sources that haven't reported yet. + glog.V(4).Infof("Skipping cleanup, sources aren't ready yet.") return nil } + allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods() + // Pod phase progresses monotonically. Once a pod has reached a final state, + // it should never leave regardless of the restart policy. The statuses + // of such pods should not be changed, and there is no need to sync them. + // TODO: the logic here does not handle two cases: + // 1. If the containers were removed immediately after they died, kubelet + // may fail to generate correct statuses, let alone filtering correctly. + // 2. If kubelet restarted before writing the terminated status for a pod + // to the apiserver, it could still restart the terminated pod (even + // though the pod was not considered terminated by the apiserver). + // These two conditions could be alleviated by checkpointing kubelet. + activePods := kl.filterOutTerminatedPods(allPods) + + desiredPods := make(map[types.UID]empty) + for _, pod := range activePods { + desiredPods[pod.UID] = empty{} + } + // Stop the workers for no-longer existing pods. + // TODO: is here the best place to forget pod workers? + kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods) + runningPods, err := kl.runtimeCache.GetPods() if err != nil { glog.Errorf("Error listing containers: %#v", err) return err } - - // Kill containers associated with unwanted pods. - err = kl.killUnwantedPods(desiredPods, runningPods) - if err != nil { - glog.Errorf("Failed killing unwanted containers: %v", err) + for _, pod := range runningPods { + if _, found := desiredPods[pod.ID]; !found { + kl.podKillingCh <- pod + } } + kl.removeOrphanedPodStatuses(allPods, mirrorPods) // Note that we just killed the unwanted pods. This may not have reflected // in the cache. We need to bypass the cache to get the latest set of // running pods to clean up the volumes. @@ -1571,42 +1591,38 @@ func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) erro return err } -// killUnwantedPods kills the unwanted, running pods in parallel. -func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty, - runningPods []*kubecontainer.Pod) error { - ch := make(chan error, len(runningPods)) - defer close(ch) - numWorkers := 0 - for _, pod := range runningPods { - if _, found := desiredPods[pod.ID]; found { - // Per-pod workers will handle the desired pods. - continue - } - numWorkers++ - go func(pod *kubecontainer.Pod, ch chan error) { - var err error = nil - defer func() { - ch <- err - }() - glog.V(1).Infof("Killing unwanted pod %q", pod.Name) - // Stop the containers. - err = kl.killPod(nil, *pod) - if err != nil { - glog.Errorf("Failed killing the pod %q: %v", pod.Name, err) +// podKiller launches a goroutine to kill a pod received from the channel if +// another goroutine isn't already in action. +func (kl *Kubelet) podKiller() { + killing := util.NewStringSet() + resultCh := make(chan types.UID) + defer close(resultCh) + for { + select { + case pod, ok := <-kl.podKillingCh: + if !ok { return } - }(pod, ch) - } + if killing.Has(string(pod.ID)) { + // The pod is already being killed. + break + } + killing.Insert(string(pod.ID)) + go func(pod *kubecontainer.Pod, ch chan types.UID) { + defer func() { + ch <- pod.ID + }() + glog.V(2).Infof("Killing unwanted pod %q", pod.Name) + err := kl.killPod(nil, *pod) + if err != nil { + glog.Errorf("Failed killing the pod %q: %v", pod.Name, err) + } + }(pod, resultCh) - // Aggregate errors from the pod killing workers. - var errs []error - for i := 0; i < numWorkers; i++ { - err := <-ch - if err != nil { - errs = append(errs, err) + case podID := <-resultCh: + killing.Delete(string(podID)) } } - return utilErrors.NewAggregate(errs) } type podsByCreationTime []*api.Pod @@ -1624,44 +1640,34 @@ func (s podsByCreationTime) Less(i, j int) bool { } // checkHostPortConflicts detects pods with conflicted host ports. -func checkHostPortConflicts(pods []*api.Pod) (fitting []*api.Pod, notFitting []*api.Pod) { +func hasHostPortConflicts(pods []*api.Pod) bool { ports := util.StringSet{} - - // Respect the pod creation order when resolving conflicts. - sort.Sort(podsByCreationTime(pods)) - for _, pod := range pods { - if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports); len(errs) != 0 { + if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports); len(errs) > 0 { glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", kubecontainer.GetPodFullName(pod), errs) - notFitting = append(notFitting, pod) - continue + return true } - fitting = append(fitting, pod) } - return + return false } -// checkSufficientfFreeResources detects pods that exceeds node's resources. -func (kl *Kubelet) checkSufficientfFreeResources(pods []*api.Pod) (fitting []*api.Pod, notFittingCPU, notFittingMemory []*api.Pod) { +// hasInsufficientfFreeResources detects pods that exceeds node's resources. +// TODO: Consider integrate disk space into this function, and returns a +// suitable reason and message per resource type. +func (kl *Kubelet) hasInsufficientfFreeResources(pods []*api.Pod) (bool, bool) { info, err := kl.GetCachedMachineInfo() if err != nil { glog.Errorf("error getting machine info: %v", err) - return pods, nil, nil + // TODO: Should we admit the pod when machine info is unavailable? + return false, false } - - // Respect the pod creation order when resolving conflicts. - sort.Sort(podsByCreationTime(pods)) - capacity := CapacityFromMachineInfo(info) - return predicates.CheckPodsExceedingFreeResources(pods, capacity) + _, notFittingCPU, notFittingMemory := predicates.CheckPodsExceedingFreeResources(pods, capacity) + return len(notFittingCPU) > 0, len(notFittingMemory) > 0 } // handleOutOfDisk detects if pods can't fit due to lack of disk space. -func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType) []*api.Pod { - if len(podSyncTypes) == 0 { - // regular sync. no new pods - return pods - } +func (kl *Kubelet) isOutOfDisk() bool { outOfDockerDisk := false outOfRootDisk := false // Check disk space once globally and reject or accept all new pods. @@ -1681,120 +1687,53 @@ func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]S // Disk manager will only declare out of disk problems if unfreeze has been called. kl.diskSpaceManager.Unfreeze() - if !outOfDockerDisk && !outOfRootDisk { - // Disk space is fine. - return pods - } - - var fitting []*api.Pod - for i := range pods { - pod := pods[i] - // Only reject pods that didn't start yet. - if podSyncTypes[pod.UID] == SyncPodCreate { - reason := "OutOfDisk" - kl.recorder.Eventf(pod, reason, "Cannot start the pod due to lack of disk space.") - kl.statusManager.SetPodStatus(pod, api.PodStatus{ - Phase: api.PodFailed, - Reason: reason, - Message: "Pod cannot be started due to lack of disk space."}) - continue - } - fitting = append(fitting, pod) - } - return fitting + return outOfDockerDisk || outOfRootDisk } -// checkNodeSelectorMatching detects pods that do not match node's labels. -func (kl *Kubelet) checkNodeSelectorMatching(pods []*api.Pod) (fitting []*api.Pod, notFitting []*api.Pod) { +// matchesNodeSelector returns true if pod matches node's labels. +func (kl *Kubelet) matchesNodeSelector(pod *api.Pod) bool { if kl.standaloneMode { - return pods, notFitting + return true } node, err := kl.GetNode() if err != nil { glog.Errorf("error getting node: %v", err) - return pods, nil + return true } - for _, pod := range pods { - if !predicates.PodMatchesNodeLabels(pod, node) { - notFitting = append(notFitting, pod) - continue - } - fitting = append(fitting, pod) - } - return + return predicates.PodMatchesNodeLabels(pod, node) } -// handleNotfittingPods handles pods that do not fit on the node and returns -// the pods that fit. It currently checks host port conflicts, node selector -// mismatches, and exceeded node capacity. -func (kl *Kubelet) handleNotFittingPods(pods []*api.Pod) []*api.Pod { - fitting, notFitting := checkHostPortConflicts(pods) - for _, pod := range notFitting { - reason := "HostPortConflict" - kl.recorder.Eventf(pod, reason, "Cannot start the pod due to host port conflict.") - kl.statusManager.SetPodStatus(pod, api.PodStatus{ - Phase: api.PodFailed, - Reason: reason, - Message: "Pod cannot be started due to host port conflict"}) - } - fitting, notFitting = kl.checkNodeSelectorMatching(fitting) - for _, pod := range notFitting { - reason := "NodeSelectorMismatching" - kl.recorder.Eventf(pod, reason, "Cannot start the pod due to node selector mismatch.") - kl.statusManager.SetPodStatus(pod, api.PodStatus{ - Phase: api.PodFailed, - Reason: reason, - Message: "Pod cannot be started due to node selector mismatch"}) - } - fitting, notFittingCPU, notFittingMemory := kl.checkSufficientfFreeResources(fitting) - for _, pod := range notFittingCPU { - reason := "InsufficientFreeCPU" - kl.recorder.Eventf(pod, reason, "Cannot start the pod due to insufficient free CPU.") - kl.statusManager.SetPodStatus(pod, api.PodStatus{ - Phase: api.PodFailed, - Reason: reason, - Message: "Pod cannot be started due to insufficient free CPU"}) - } - for _, pod := range notFittingMemory { - reason := "InsufficientFreeMemory" - kl.recorder.Eventf(pod, reason, "Cannot start the pod due to insufficient free memory.") - kl.statusManager.SetPodStatus(pod, api.PodStatus{ - Phase: api.PodFailed, - Reason: reason, - Message: "Pod cannot be started due to insufficient free memory"}) - } - return fitting +func (kl *Kubelet) rejectPod(pod *api.Pod, reason, message string) { + kl.recorder.Eventf(pod, reason, message) + kl.statusManager.SetPodStatus(pod, api.PodStatus{ + Phase: api.PodFailed, + Reason: reason, + Message: "Pod " + message}) } -// admitPods handles pod admission. It filters out terminated pods, and pods -// that don't fit on the node, and may reject pods if node is overcommitted. -func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncPodType) []*api.Pod { - // Pod phase progresses monotonically. Once a pod has reached a final state, - // it should never leave regardless of the restart policy. The statuses - // of such pods should not be changed, and there is no need to sync them. - // TODO: the logic here does not handle two cases: - // 1. If the containers were removed immediately after they died, kubelet - // may fail to generate correct statuses, let alone filtering correctly. - // 2. If kubelet restarted before writing the terminated status for a pod - // to the apiserver, it could still restart the terminated pod (even - // though the pod was not considered terminated by the apiserver). - // These two conditions could be alleviated by checkpointing kubelet. - pods := kl.filterOutTerminatedPods(allPods) +// canAdmitPod determines if a pod can be admitted, and gives a reason if it +// cannot. "pod" is new pod, while "pods" include all admitted pods plus the +// new pod. The function returns a boolean value indicating whether the pod +// can be admitted, a brief single-word reason and a message explaining why +// the pod cannot be admitted. +func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, string) { + if hasHostPortConflicts(pods) { + return false, "HostPortConflict", "cannot start the pod due to host port conflict." + } + if !kl.matchesNodeSelector(pod) { + return false, "NodeSelectorMismatching", "cannot be started due to node selector mismatch" + } + cpu, memory := kl.hasInsufficientfFreeResources(pods) + if cpu { + return false, "InsufficientFreeCPU", "cannot start the pod due to insufficient free CPU." + } else if memory { + return false, "InsufficientFreeMemory", "cannot be started due to insufficient free memory" + } + if kl.isOutOfDisk() { + return false, "OutOfDisk", "cannot be started due to lack of disk space." + } - // Respect the pod creation order when resolving conflicts. - sort.Sort(podsByCreationTime(pods)) - - // Reject pods that we cannot run. - // handleNotFittingPods relies on static information (e.g. immutable fields - // in the pod specs or machine information that doesn't change without - // rebooting), and the pods are sorted by immutable creation time. Hence it - // should only rejects new pods without checking the pod sync types. - fitting := kl.handleNotFittingPods(pods) - - // Reject new creation requests if diskspace is running low. - admittedPods := kl.handleOutOfDisk(fitting, podSyncTypes) - - return admittedPods + return true, "", "" } // syncLoop is the main loop for processing changes. It watches for changes from @@ -1821,39 +1760,124 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandl glog.Infof("Skipping pod synchronization, network is not configured") return } - unsyncedPod := false - podSyncTypes := make(map[types.UID]SyncPodType) select { case u, ok := <-updates: if !ok { glog.Errorf("Update channel is closed. Exiting the sync loop.") return } - kl.podManager.UpdatePods(u, podSyncTypes) - unsyncedPod = true - kl.syncLoopMonitor.Store(time.Now()) + switch u.Op { + case ADD: + glog.V(2).Infof("SyncLoop (ADD): %q", kubeletUtil.FormatPodNames(u.Pods)) + handler.HandlePodAdditions(u.Pods) + case UPDATE: + glog.V(2).Infof("SyncLoop (UPDATE): %q", kubeletUtil.FormatPodNames(u.Pods)) + handler.HandlePodUpdates(u.Pods) + case REMOVE: + glog.V(2).Infof("SyncLoop (REMOVE): %q", kubeletUtil.FormatPodNames(u.Pods)) + handler.HandlePodDeletions(u.Pods) + case SET: + // TODO: Do we want to support this? + glog.Errorf("Kubelet does not support snapshot update") + } case <-time.After(kl.resyncInterval): - glog.V(4).Infof("Periodic sync") - } - start := time.Now() - // If we already caught some update, try to wait for some short time - // to possibly batch it with other incoming updates. - for unsyncedPod { - select { - case u := <-updates: - kl.podManager.UpdatePods(u, podSyncTypes) - kl.syncLoopMonitor.Store(time.Now()) - case <-time.After(5 * time.Millisecond): - // Break the for loop. - unsyncedPod = false + // Periodically syncs all the pods and performs cleanup tasks. + glog.V(4).Infof("SyncLoop (periodic sync)") + handler.HandlePodSyncs(kl.podManager.GetPods()) + if err := handler.HandlePodCleanups(); err != nil { + glog.Errorf("Failed cleaning pods: %v", err) } } - pods, mirrorPods := kl.podManager.GetPodsAndMirrorMap() + kl.syncLoopMonitor.Store(time.Now()) - if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil { - glog.Errorf("Couldn't sync containers: %v", err) +} + +func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType SyncPodType, mirrorPod *api.Pod, start time.Time) { + if kl.podIsTerminated(pod) { + return + } + // Run the sync in an async worker. + kl.podWorkers.UpdatePod(pod, mirrorPod, func() { + metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start)) + }) + // Note the number of containers for new pods. + if syncType == SyncPodCreate { + metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) + } +} + +// TODO: Consider handling all mirror pods updates in a separate component. +func (kl *Kubelet) handleMirrorPod(mirrorPod *api.Pod, start time.Time) { + // Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the + // corresponding static pod. Send update to the pod worker if the static + // pod exists. + if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok { + kl.dispatchWork(pod, SyncPodUpdate, mirrorPod, start) + } +} + +func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) { + start := time.Now() + sort.Sort(podsByCreationTime(pods)) + for _, pod := range pods { + kl.podManager.AddPod(pod) + if isMirrorPod(pod) { + kl.handleMirrorPod(pod, start) + continue + } + // Note that allPods includes the new pod since we added at the + // beginning of the loop. + allPods := kl.podManager.GetPods() + // We failed pods that we rejected, so activePods include all admitted + // pods that are alive and the new pod. + activePods := kl.filterOutTerminatedPods(allPods) + // Check if we can admit the pod; if not, reject it. + if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok { + kl.rejectPod(pod, reason, message) + continue + } + mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) + kl.dispatchWork(pod, SyncPodCreate, mirrorPod, start) + } +} + +func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) { + start := time.Now() + for _, pod := range pods { + kl.podManager.UpdatePod(pod) + if isMirrorPod(pod) { + kl.handleMirrorPod(pod, start) + continue + } + // TODO: Evaluate if we need to validate and reject updates. + + mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) + kl.dispatchWork(pod, SyncPodUpdate, mirrorPod, start) + } +} + +func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) { + start := time.Now() + for _, pod := range pods { + kl.podManager.DeletePod(pod) + if isMirrorPod(pod) { + kl.handleMirrorPod(pod, start) + continue + } + // Deletion is allowed to fail because the periodic cleanup routine + // will trigger deletion again. + if err := kl.deletePod(pod.UID); err != nil { + glog.V(2).Infof("Failed to delete pod %q, err: %v", kubeletUtil.FormatPodName(pod), err) + } + } +} + +func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) { + start := time.Now() + for _, pod := range pods { + mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) + kl.dispatchWork(pod, SyncPodSync, mirrorPod, start) } - kl.syncLoopMonitor.Store(time.Now()) } func (kl *Kubelet) LatestLoopEntryTime() time.Time { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index b6d5899fc2..d485c1a5e2 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -132,6 +132,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { fakeClock := &util.FakeClock{Time: time.Now()} kubelet.backOff = util.NewBackOff(time.Second, time.Minute) kubelet.backOff.Clock = fakeClock + kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20) return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient} } @@ -348,10 +349,7 @@ func TestSyncPodsStartPod(t *testing.T) { }, } kubelet.podManager.SetPods(pods) - err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + kubelet.HandlePodSyncs(pods) fakeRuntime.AssertStartedPods([]string{string(pods[0].UID)}) } @@ -375,16 +373,12 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { }, }, } - if err := kubelet.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()); err != nil { - t.Errorf("unexpected error: %v", err) - } + kubelet.HandlePodCleanups() // Sources are not ready yet. Don't remove any pods. fakeRuntime.AssertKilledPods([]string{}) ready = true - if err := kubelet.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()); err != nil { - t.Errorf("unexpected error: %v", err) - } + kubelet.HandlePodCleanups() // Sources are ready. Remove unwanted pods. fakeRuntime.AssertKilledPods([]string{"12345678"}) @@ -2004,18 +1998,17 @@ func TestGetHostPortConflicts(t *testing.T) { {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}}, } // Pods should not cause any conflict. - _, conflicts := checkHostPortConflicts(pods) - if len(conflicts) != 0 { - t.Errorf("expected no conflicts, Got %#v", conflicts) + if hasHostPortConflicts(pods) { + t.Errorf("expected no conflicts, Got conflicts") } - // The new pod should cause conflict and be reported. expected := &api.Pod{ Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}, } + // The new pod should cause conflict and be reported. pods = append(pods, expected) - if _, actual := checkHostPortConflicts(pods); !reflect.DeepEqual(actual, []*api.Pod{expected}) { - t.Errorf("expected %#v, Got %#v", expected, actual) + if !hasHostPortConflicts(pods) { + t.Errorf("expected no conflict, Got no conflicts") } } @@ -2052,7 +2045,7 @@ func TestHandlePortConflicts(t *testing.T) { // The newer pod should be rejected. conflictedPod := pods[0] - kl.handleNotFittingPods(pods) + kl.HandlePodAdditions(pods) // Check pod status stored in the status map. status, found := kl.statusManager.GetPodStatus(conflictedPod.UID) if !found { @@ -2094,7 +2087,7 @@ func TestHandleNodeSelector(t *testing.T) { // The first pod should be rejected. notfittingPod := pods[0] - kl.handleNotFittingPods(pods) + kl.HandlePodAdditions(pods) // Check pod status stored in the status map. status, found := kl.statusManager.GetPodStatus(notfittingPod.UID) if !found { @@ -2142,7 +2135,7 @@ func TestHandleMemExceeded(t *testing.T) { // The newer pod should be rejected. notfittingPod := pods[0] - kl.handleNotFittingPods(pods) + kl.HandlePodAdditions(pods) // Check pod status stored in the status map. status, found := kl.statusManager.GetPodStatus(notfittingPod.UID) if !found { @@ -2167,12 +2160,13 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) { } podToTest := pods[1] // Run once to populate the status map. - kl.handleNotFittingPods(pods) + kl.HandlePodAdditions(pods) if _, found := kl.statusManager.GetPodStatus(podToTest.UID); !found { t.Fatalf("expected to have status cached for pod2") } // Sync with empty pods so that the entry in status map will be removed. - kl.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) + kl.podManager.SetPods([]*api.Pod{}) + kl.HandlePodCleanups() if _, found := kl.statusManager.GetPodStatus(podToTest.UID); found { t.Fatalf("expected to not have status cached for pod2") } @@ -2695,12 +2689,8 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) { } kl.podManager.SetPods(orphanPods) - pods, mirrorMap := kl.podManager.GetPodsAndMirrorMap() // Sync with an empty pod list to delete all mirror pods. - err := kl.SyncPods(pods, emptyPodUIDs, mirrorMap, time.Now()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + kl.HandlePodCleanups() if manager.NumOfPods() != 0 { t.Errorf("expected zero mirror pods, got %v", manager.GetPods()) } @@ -2802,7 +2792,7 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) { { ObjectMeta: api.ObjectMeta{ UID: "12345678", - Name: "foo", + Name: "staticFoo", Namespace: "new", Annotations: map[string]string{ ConfigSourceAnnotationKey: "file", @@ -2815,11 +2805,9 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) { }, }, } + kubelet.podManager.SetPods(pods) - err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + kubelet.HandlePodSyncs(kubelet.podManager.GetPods()) status, ok := kubelet.statusManager.GetPodStatus(pods[0].UID) if ok { t.Errorf("unexpected status %#v found for static pod %q", status, pods[0].UID) @@ -3147,10 +3135,7 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { } // Let the pod worker sets the status to fail after this sync. - err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + kubelet.HandlePodUpdates(pods) status, found := kubelet.statusManager.GetPodStatus(pods[0].UID) if !found { t.Errorf("expected to found status for pod %q", pods[0].UID) @@ -3201,10 +3186,7 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) { } kubelet.podManager.SetPods(pods) - err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + kubelet.HandlePodUpdates(pods) status, found := kubelet.statusManager.GetPodStatus(pods[0].UID) if !found { t.Errorf("expected to found status for pod %q", pods[0].UID) @@ -3239,10 +3221,7 @@ func TestDeletePodDirsForDeletedPods(t *testing.T) { kl.podManager.SetPods(pods) // Sync to create pod directories. - err := kl.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + kl.HandlePodSyncs(kl.podManager.GetPods()) for i := range pods { if !dirExists(kl.getPodDir(pods[i].UID)) { t.Errorf("expected directory to exist for pod %d", i) @@ -3250,10 +3229,8 @@ func TestDeletePodDirsForDeletedPods(t *testing.T) { } // Pod 1 has been deleted and no longer exists. - err = kl.SyncPods([]*api.Pod{pods[0]}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + kl.podManager.SetPods([]*api.Pod{pods[0]}) + kl.HandlePodCleanups() if !dirExists(kl.getPodDir(pods[0].UID)) { t.Errorf("expected directory to exist for pod 0") } @@ -3294,10 +3271,7 @@ func TestDoesNotDeletePodDirsForTerminatedPods(t *testing.T) { kl.podManager.SetPods(pods) // Sync to create pod directories. - err := kl.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + kl.HandlePodSyncs(pods) for i := range pods { if !dirExists(kl.getPodDir(pods[i].UID)) { t.Errorf("expected directory to exist for pod %d", i) @@ -3307,7 +3281,7 @@ func TestDoesNotDeletePodDirsForTerminatedPods(t *testing.T) { // deleted. kl.statusManager.SetPodStatus(pods[1], api.PodStatus{Phase: api.PodFailed}) kl.statusManager.SetPodStatus(pods[2], api.PodStatus{Phase: api.PodSucceeded}) - err = kl.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) + kl.HandlePodCleanups() for i := range pods { if !dirExists(kl.getPodDir(pods[i].UID)) { t.Errorf("expected directory to exist for pod %d", i) diff --git a/pkg/kubelet/pod_manager.go b/pkg/kubelet/pod_manager.go index afe721d764..1ea358accd 100644 --- a/pkg/kubelet/pod_manager.go +++ b/pkg/kubelet/pod_manager.go @@ -19,7 +19,6 @@ package kubelet import ( "sync" - "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -45,9 +44,19 @@ type podManager interface { GetPods() []*api.Pod GetPodByFullName(podFullName string) (*api.Pod, bool) GetPodByName(namespace, name string) (*api.Pod, bool) - GetPodsAndMirrorMap() ([]*api.Pod, map[string]*api.Pod) + GetPodByMirrorPod(*api.Pod) (*api.Pod, bool) + GetMirrorPodByPod(*api.Pod) (*api.Pod, bool) + GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod) + + // SetPods replaces the internal pods with the new pods. + // It is currently only used for testing. SetPods(pods []*api.Pod) - UpdatePods(u PodUpdate, podSyncTypes map[types.UID]SyncPodType) + + // Methods that modify a single pod. + AddPod(pod *api.Pod) + UpdatePod(pod *api.Pod) + DeletePod(pod *api.Pod) + DeleteOrphanedMirrorPods() TranslatePodUID(uid types.UID) types.UID IsMirrorPodOf(mirrorPod, pod *api.Pod) bool @@ -103,50 +112,6 @@ func newBasicPodManager(apiserverClient client.Interface) *basicPodManager { return pm } -// Update the internal pods with those provided by the update. -func (pm *basicPodManager) UpdatePods(u PodUpdate, podSyncTypes map[types.UID]SyncPodType) { - pm.lock.Lock() - defer pm.lock.Unlock() - switch u.Op { - case SET: - glog.V(3).Infof("SET: Containers changed") - // Store the new pods. Don't worry about filtering host ports since those - // pods will never be looked up. - existingPods := make(map[types.UID]struct{}) - for uid := range pm.podByUID { - existingPods[uid] = struct{}{} - } - - // Update the internal pods. - pm.setPods(u.Pods) - - for uid := range pm.podByUID { - if _, ok := existingPods[uid]; !ok { - podSyncTypes[uid] = SyncPodCreate - } - } - case UPDATE: - glog.V(3).Infof("Update: Containers changed") - - // Store the updated pods. Don't worry about filtering host ports since those - // pods will never be looked up. - for i := range u.Pods { - podSyncTypes[u.Pods[i].UID] = SyncPodUpdate - } - allPods := applyUpdates(u.Pods, pm.getAllPods()) - pm.setPods(allPods) - default: - panic("syncLoop does not support incremental changes") - } - - // Mark all remaining pods as sync. - for uid := range pm.podByUID { - if _, ok := podSyncTypes[uid]; !ok { - podSyncTypes[uid] = SyncPodSync - } - } -} - // Set the internal pods based on the new pods. func (pm *basicPodManager) SetPods(newPods []*api.Pod) { pm.lock.Lock() @@ -177,24 +142,34 @@ func (pm *basicPodManager) setPods(newPods []*api.Pod) { pm.mirrorPodByFullName = mirrorPodByFullName } -func applyUpdates(changed []*api.Pod, current []*api.Pod) []*api.Pod { - updated := []*api.Pod{} - m := map[types.UID]*api.Pod{} - for _, pod := range changed { - m[pod.UID] = pod - } +func (pm *basicPodManager) AddPod(pod *api.Pod) { + pm.UpdatePod(pod) +} - for _, pod := range current { - if m[pod.UID] != nil { - updated = append(updated, m[pod.UID]) - glog.V(4).Infof("pod with UID: %q has a new spec %+v", pod.UID, *m[pod.UID]) - } else { - updated = append(updated, pod) - glog.V(4).Infof("pod with UID: %q stay with the same spec %+v", pod.UID, *pod) - } +func (pm *basicPodManager) UpdatePod(pod *api.Pod) { + pm.lock.Lock() + defer pm.lock.Unlock() + podFullName := kubecontainer.GetPodFullName(pod) + if isMirrorPod(pod) { + pm.mirrorPodByUID[pod.UID] = pod + pm.mirrorPodByFullName[podFullName] = pod + } else { + pm.podByUID[pod.UID] = pod + pm.podByFullName[podFullName] = pod } +} - return updated +func (pm *basicPodManager) DeletePod(pod *api.Pod) { + pm.lock.Lock() + defer pm.lock.Unlock() + podFullName := kubecontainer.GetPodFullName(pod) + if isMirrorPod(pod) { + delete(pm.mirrorPodByUID, pod.UID) + delete(pm.mirrorPodByFullName, podFullName) + } else { + delete(pm.podByUID, pod.UID) + delete(pm.podByFullName, podFullName) + } } // GetPods returns the regular pods bound to the kubelet and their spec. @@ -204,23 +179,20 @@ func (pm *basicPodManager) GetPods() []*api.Pod { return podsMapToPods(pm.podByUID) } +// GetPodsAndMirrorPods returns the both regular and mirror pods. +func (pm *basicPodManager) GetPodsAndMirrorPods() ([]*api.Pod, []*api.Pod) { + pm.lock.RLock() + defer pm.lock.RUnlock() + pods := podsMapToPods(pm.podByUID) + mirrorPods := podsMapToPods(pm.mirrorPodByUID) + return pods, mirrorPods +} + // Returns all pods (including mirror pods). func (pm *basicPodManager) getAllPods() []*api.Pod { return append(podsMapToPods(pm.podByUID), podsMapToPods(pm.mirrorPodByUID)...) } -// GetPodsAndMirrorMap returns the a copy of the regular pods and the mirror -// pods indexed by full name. -func (pm *basicPodManager) GetPodsAndMirrorMap() ([]*api.Pod, map[string]*api.Pod) { - pm.lock.RLock() - defer pm.lock.RUnlock() - mirrorPods := make(map[string]*api.Pod) - for key, pod := range pm.mirrorPodByFullName { - mirrorPods[key] = pod - } - return podsMapToPods(pm.podByUID), mirrorPods -} - // GetPodByName provides the (non-mirror) pod that matches namespace and name, // as well as whether the pod was found. func (pm *basicPodManager) GetPodByName(namespace, name string) (*api.Pod, bool) { @@ -295,3 +267,17 @@ func podsMapToPods(UIDMap map[types.UID]*api.Pod) []*api.Pod { } return pods } + +func (pm *basicPodManager) GetMirrorPodByPod(pod *api.Pod) (*api.Pod, bool) { + pm.lock.RLock() + defer pm.lock.RUnlock() + mirrorPod, ok := pm.mirrorPodByFullName[kubecontainer.GetPodFullName(pod)] + return mirrorPod, ok +} + +func (pm *basicPodManager) GetPodByMirrorPod(mirrorPod *api.Pod) (*api.Pod, bool) { + pm.lock.RLock() + defer pm.lock.RUnlock() + pod, ok := pm.podByFullName[kubecontainer.GetPodFullName(mirrorPod)] + return pod, ok +} diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 5cc96e0457..e508e32488 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -32,6 +32,7 @@ import ( type PodWorkers interface { UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) + ForgetWorker(uid types.UID) } type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod, SyncPodType) error @@ -171,19 +172,30 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete } } +func (p *podWorkers) removeWorker(uid types.UID) { + if ch, ok := p.podUpdates[uid]; ok { + close(ch) + delete(p.podUpdates, uid) + // If there is an undelivered work update for this pod we need to remove it + // since per-pod goroutine won't be able to put it to the already closed + // channel when it finish processing the current work update. + if _, cached := p.lastUndeliveredWorkUpdate[uid]; cached { + delete(p.lastUndeliveredWorkUpdate, uid) + } + } +} +func (p *podWorkers) ForgetWorker(uid types.UID) { + p.podLock.Lock() + defer p.podLock.Unlock() + p.removeWorker(uid) +} + func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) { p.podLock.Lock() defer p.podLock.Unlock() - for key, channel := range p.podUpdates { + for key := range p.podUpdates { if _, exists := desiredPods[key]; !exists { - close(channel) - delete(p.podUpdates, key) - // If there is an undelivered work update for this pod we need to remove it - // since per-pod goroutine won't be able to put it to the already closed - // channel when it finish processing the current work update. - if _, cached := p.lastUndeliveredWorkUpdate[key]; cached { - delete(p.lastUndeliveredWorkUpdate, key) - } + p.removeWorker(key) } } } diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index e73a636b36..e2c992803f 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -53,10 +53,15 @@ func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error) { // runOnce runs a given set of pods and returns their status. func (kl *Kubelet) runOnce(pods []*api.Pod, retryDelay time.Duration) (results []RunPodResult, err error) { - kl.handleNotFittingPods(pods) - ch := make(chan RunPodResult) + admitted := []*api.Pod{} for _, pod := range pods { + // Check if we can admit the pod. + if ok, reason, message := kl.canAdmitPod(append(admitted, pod), pod); !ok { + kl.rejectPod(pod, reason, message) + } else { + admitted = append(admitted, pod) + } go func(pod *api.Pod) { err := kl.runPod(pod, retryDelay) ch <- RunPodResult{pod, err} diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index b4b3ebec49..6f8d21bed1 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -76,6 +76,7 @@ func TestRunOnce(t *testing.T) { cadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) podManager, _ := newFakePodManager() + diskSpaceManager, _ := newDiskSpaceManager(cadvisor, DiskSpacePolicy{}) kb := &Kubelet{ rootDirectory: "/tmp/kubelet", @@ -88,6 +89,7 @@ func TestRunOnce(t *testing.T) { podManager: podManager, os: kubecontainer.FakeOS{}, volumeManager: newVolumeManager(), + diskSpaceManager: diskSpaceManager, } kb.containerManager, _ = newContainerManager(cadvisor, "", "", "")