mirror of https://github.com/k3s-io/k3s
delete volumes before pod deletion
parent
74cf0484c3
commit
67cb2704c5
|
@ -182,6 +182,7 @@ go_test(
|
||||||
"//pkg/kubelet/server/remotecommand:go_default_library",
|
"//pkg/kubelet/server/remotecommand:go_default_library",
|
||||||
"//pkg/kubelet/server/stats:go_default_library",
|
"//pkg/kubelet/server/stats:go_default_library",
|
||||||
"//pkg/kubelet/status:go_default_library",
|
"//pkg/kubelet/status:go_default_library",
|
||||||
|
"//pkg/kubelet/status/testing:go_default_library",
|
||||||
"//pkg/kubelet/types:go_default_library",
|
"//pkg/kubelet/types:go_default_library",
|
||||||
"//pkg/kubelet/util/queue:go_default_library",
|
"//pkg/kubelet/util/queue:go_default_library",
|
||||||
"//pkg/kubelet/util/sliceutils:go_default_library",
|
"//pkg/kubelet/util/sliceutils:go_default_library",
|
||||||
|
|
|
@ -690,7 +690,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
||||||
}
|
}
|
||||||
klet.imageManager = imageManager
|
klet.imageManager = imageManager
|
||||||
|
|
||||||
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager)
|
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
|
||||||
|
|
||||||
klet.probeManager = prober.NewManager(
|
klet.probeManager = prober.NewManager(
|
||||||
klet.statusManager,
|
klet.statusManager,
|
||||||
|
@ -715,6 +715,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
||||||
kubeCfg.EnableControllerAttachDetach,
|
kubeCfg.EnableControllerAttachDetach,
|
||||||
nodeName,
|
nodeName,
|
||||||
klet.podManager,
|
klet.podManager,
|
||||||
|
klet.statusManager,
|
||||||
klet.kubeClient,
|
klet.kubeClient,
|
||||||
klet.volumePluginMgr,
|
klet.volumePluginMgr,
|
||||||
klet.containerRuntime,
|
klet.containerRuntime,
|
||||||
|
|
|
@ -728,6 +728,37 @@ func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns true if all required node-level resources that a pod was consuming have been reclaimed by the kubelet.
|
||||||
|
// Reclaiming resources is a prerequisite to deleting a pod from the API server.
|
||||||
|
func (kl *Kubelet) OkToDeletePod(pod *v1.Pod) bool {
|
||||||
|
if pod.DeletionTimestamp == nil {
|
||||||
|
// We shouldnt delete pods whose DeletionTimestamp is not set
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if !notRunning(pod.Status.ContainerStatuses) {
|
||||||
|
// We shouldnt delete pods that still have running containers
|
||||||
|
glog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if kl.podVolumesExist(pod.UID) && !kl.kubeletConfiguration.KeepTerminatedPodVolumes {
|
||||||
|
// We shouldnt delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes
|
||||||
|
glog.V(3).Infof("Pod %q is terminated, but some volumes have not been cleaned up", format.Pod(pod))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// notRunning returns true if every status is terminated or waiting, or the status list
|
||||||
|
// is empty.
|
||||||
|
func notRunning(statuses []v1.ContainerStatus) bool {
|
||||||
|
for _, status := range statuses {
|
||||||
|
if status.State.Terminated == nil && status.State.Waiting == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// filterOutTerminatedPods returns the given pods which the status manager
|
// filterOutTerminatedPods returns the given pods which the status manager
|
||||||
// does not consider failed or succeeded.
|
// does not consider failed or succeeded.
|
||||||
func (kl *Kubelet) filterOutTerminatedPods(pods []*v1.Pod) []*v1.Pod {
|
func (kl *Kubelet) filterOutTerminatedPods(pods []*v1.Pod) []*v1.Pod {
|
||||||
|
|
|
@ -60,6 +60,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/kubelet/secret"
|
"k8s.io/kubernetes/pkg/kubelet/secret"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/server/stats"
|
"k8s.io/kubernetes/pkg/kubelet/server/stats"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||||
|
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
|
||||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/queue"
|
"k8s.io/kubernetes/pkg/kubelet/util/queue"
|
||||||
kubeletvolume "k8s.io/kubernetes/pkg/kubelet/volumemanager"
|
kubeletvolume "k8s.io/kubernetes/pkg/kubelet/volumemanager"
|
||||||
|
@ -180,7 +181,7 @@ func newTestKubeletWithImageList(
|
||||||
}
|
}
|
||||||
kubelet.secretManager = secretManager
|
kubelet.secretManager = secretManager
|
||||||
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager)
|
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient, kubelet.secretManager)
|
||||||
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager)
|
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{})
|
||||||
kubelet.containerRefManager = kubecontainer.NewRefManager()
|
kubelet.containerRefManager = kubecontainer.NewRefManager()
|
||||||
diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, DiskSpacePolicy{})
|
diskSpaceManager, err := newDiskSpaceManager(mockCadvisor, DiskSpacePolicy{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -262,6 +263,7 @@ func newTestKubeletWithImageList(
|
||||||
controllerAttachDetachEnabled,
|
controllerAttachDetachEnabled,
|
||||||
kubelet.nodeName,
|
kubelet.nodeName,
|
||||||
kubelet.podManager,
|
kubelet.podManager,
|
||||||
|
kubelet.statusManager,
|
||||||
fakeKubeClient,
|
fakeKubeClient,
|
||||||
kubelet.volumePluginMgr,
|
kubelet.volumePluginMgr,
|
||||||
fakeRuntime,
|
fakeRuntime,
|
||||||
|
|
|
@ -56,6 +56,7 @@ go_test(
|
||||||
"//pkg/kubelet/pod:go_default_library",
|
"//pkg/kubelet/pod:go_default_library",
|
||||||
"//pkg/kubelet/prober/results:go_default_library",
|
"//pkg/kubelet/prober/results:go_default_library",
|
||||||
"//pkg/kubelet/status:go_default_library",
|
"//pkg/kubelet/status:go_default_library",
|
||||||
|
"//pkg/kubelet/status/testing:go_default_library",
|
||||||
"//pkg/probe:go_default_library",
|
"//pkg/probe:go_default_library",
|
||||||
"//pkg/util/exec:go_default_library",
|
"//pkg/util/exec:go_default_library",
|
||||||
"//vendor:github.com/golang/glog",
|
"//vendor:github.com/golang/glog",
|
||||||
|
|
|
@ -28,6 +28,7 @@ import (
|
||||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/prober/results"
|
"k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||||
|
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
|
||||||
"k8s.io/kubernetes/pkg/probe"
|
"k8s.io/kubernetes/pkg/probe"
|
||||||
"k8s.io/kubernetes/pkg/util/exec"
|
"k8s.io/kubernetes/pkg/util/exec"
|
||||||
)
|
)
|
||||||
|
@ -102,7 +103,7 @@ func newTestManager() *manager {
|
||||||
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
|
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
|
||||||
podManager.AddPod(getTestPod())
|
podManager.AddPod(getTestPod())
|
||||||
m := NewManager(
|
m := NewManager(
|
||||||
status.NewManager(&fake.Clientset{}, podManager),
|
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}),
|
||||||
results.NewManager(),
|
results.NewManager(),
|
||||||
nil, // runner
|
nil, // runner
|
||||||
refManager,
|
refManager,
|
||||||
|
|
|
@ -31,6 +31,7 @@ import (
|
||||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/prober/results"
|
"k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||||
|
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
|
||||||
"k8s.io/kubernetes/pkg/probe"
|
"k8s.io/kubernetes/pkg/probe"
|
||||||
"k8s.io/kubernetes/pkg/util/exec"
|
"k8s.io/kubernetes/pkg/util/exec"
|
||||||
)
|
)
|
||||||
|
@ -117,7 +118,7 @@ func TestDoProbe(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean up.
|
// Clean up.
|
||||||
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil))
|
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil, nil), &statustest.FakePodDeletionSafetyProvider{})
|
||||||
resultsManager(m, probeType).Remove(testContainerID)
|
resultsManager(m, probeType).Remove(testContainerID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,6 +44,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/kubelet/secret"
|
"k8s.io/kubernetes/pkg/kubelet/secret"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/server/stats"
|
"k8s.io/kubernetes/pkg/kubelet/server/stats"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||||
|
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
|
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||||
|
@ -77,7 +78,7 @@ func TestRunOnce(t *testing.T) {
|
||||||
cadvisor: cadvisor,
|
cadvisor: cadvisor,
|
||||||
nodeLister: testNodeLister{},
|
nodeLister: testNodeLister{},
|
||||||
nodeInfo: testNodeInfo{},
|
nodeInfo: testNodeInfo{},
|
||||||
statusManager: status.NewManager(nil, podManager),
|
statusManager: status.NewManager(nil, podManager, &statustest.FakePodDeletionSafetyProvider{}),
|
||||||
containerRefManager: kubecontainer.NewRefManager(),
|
containerRefManager: kubecontainer.NewRefManager(),
|
||||||
podManager: podManager,
|
podManager: podManager,
|
||||||
os: &containertest.FakeOS{},
|
os: &containertest.FakeOS{},
|
||||||
|
@ -102,6 +103,7 @@ func TestRunOnce(t *testing.T) {
|
||||||
true,
|
true,
|
||||||
kb.nodeName,
|
kb.nodeName,
|
||||||
kb.podManager,
|
kb.podManager,
|
||||||
|
kb.statusManager,
|
||||||
kb.kubeClient,
|
kb.kubeClient,
|
||||||
kb.volumePluginMgr,
|
kb.volumePluginMgr,
|
||||||
fakeRuntime,
|
fakeRuntime,
|
||||||
|
|
|
@ -51,6 +51,7 @@ go_test(
|
||||||
"//pkg/kubelet/pod:go_default_library",
|
"//pkg/kubelet/pod:go_default_library",
|
||||||
"//pkg/kubelet/pod/testing:go_default_library",
|
"//pkg/kubelet/pod/testing:go_default_library",
|
||||||
"//pkg/kubelet/secret:go_default_library",
|
"//pkg/kubelet/secret:go_default_library",
|
||||||
|
"//pkg/kubelet/status/testing:go_default_library",
|
||||||
"//pkg/kubelet/types:go_default_library",
|
"//pkg/kubelet/types:go_default_library",
|
||||||
"//vendor:github.com/stretchr/testify/assert",
|
"//vendor:github.com/stretchr/testify/assert",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
||||||
|
@ -70,6 +71,9 @@ filegroup(
|
||||||
|
|
||||||
filegroup(
|
filegroup(
|
||||||
name = "all-srcs",
|
name = "all-srcs",
|
||||||
srcs = [":package-srcs"],
|
srcs = [
|
||||||
|
":package-srcs",
|
||||||
|
"//pkg/kubelet/status/testing:all-srcs",
|
||||||
|
],
|
||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
)
|
)
|
||||||
|
|
|
@ -67,6 +67,7 @@ type manager struct {
|
||||||
// Map from (mirror) pod UID to latest status version successfully sent to the API server.
|
// Map from (mirror) pod UID to latest status version successfully sent to the API server.
|
||||||
// apiStatusVersions must only be accessed from the sync thread.
|
// apiStatusVersions must only be accessed from the sync thread.
|
||||||
apiStatusVersions map[types.UID]uint64
|
apiStatusVersions map[types.UID]uint64
|
||||||
|
podDeletionSafety PodDeletionSafetyProvider
|
||||||
}
|
}
|
||||||
|
|
||||||
// PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components
|
// PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components
|
||||||
|
@ -77,6 +78,12 @@ type PodStatusProvider interface {
|
||||||
GetPodStatus(uid types.UID) (v1.PodStatus, bool)
|
GetPodStatus(uid types.UID) (v1.PodStatus, bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// An object which provides guarantees that a pod can be saftely deleted.
|
||||||
|
type PodDeletionSafetyProvider interface {
|
||||||
|
// A function which returns true if the pod can safely be deleted
|
||||||
|
OkToDeletePod(pod *v1.Pod) bool
|
||||||
|
}
|
||||||
|
|
||||||
// Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with
|
// Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with
|
||||||
// the latest v1.PodStatus. It also syncs updates back to the API server.
|
// the latest v1.PodStatus. It also syncs updates back to the API server.
|
||||||
type Manager interface {
|
type Manager interface {
|
||||||
|
@ -103,13 +110,14 @@ type Manager interface {
|
||||||
|
|
||||||
const syncPeriod = 10 * time.Second
|
const syncPeriod = 10 * time.Second
|
||||||
|
|
||||||
func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager) Manager {
|
func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider) Manager {
|
||||||
return &manager{
|
return &manager{
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
podManager: podManager,
|
podManager: podManager,
|
||||||
podStatuses: make(map[types.UID]versionedPodStatus),
|
podStatuses: make(map[types.UID]versionedPodStatus),
|
||||||
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
|
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
|
||||||
apiStatusVersions: make(map[types.UID]uint64),
|
apiStatusVersions: make(map[types.UID]uint64),
|
||||||
|
podDeletionSafety: podDeletionSafety,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -381,7 +389,7 @@ func (m *manager) syncBatch() {
|
||||||
}
|
}
|
||||||
syncedUID = mirrorUID
|
syncedUID = mirrorUID
|
||||||
}
|
}
|
||||||
if m.needsUpdate(syncedUID, status) {
|
if m.needsUpdate(syncedUID, status) || m.couldBeDeleted(uid, status.status) {
|
||||||
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
|
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
|
||||||
} else if m.needsReconcile(uid, status.status) {
|
} else if m.needsReconcile(uid, status.status) {
|
||||||
// Delete the apiStatusVersions here to force an update on the pod status
|
// Delete the apiStatusVersions here to force an update on the pod status
|
||||||
|
@ -434,11 +442,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
|
||||||
// We don't handle graceful deletion of mirror pods.
|
// We don't handle graceful deletion of mirror pods.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if pod.DeletionTimestamp == nil {
|
if !m.podDeletionSafety.OkToDeletePod(pod) {
|
||||||
return
|
|
||||||
}
|
|
||||||
if !notRunning(pod.Status.ContainerStatuses) {
|
|
||||||
glog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod))
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
deleteOptions := metav1.NewDeleteOptions(0)
|
deleteOptions := metav1.NewDeleteOptions(0)
|
||||||
|
@ -463,6 +467,15 @@ func (m *manager) needsUpdate(uid types.UID, status versionedPodStatus) bool {
|
||||||
return !ok || latest < status.version
|
return !ok || latest < status.version
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *manager) couldBeDeleted(uid types.UID, status v1.PodStatus) bool {
|
||||||
|
// The pod could be a static pod, so we should translate first.
|
||||||
|
pod, ok := m.podManager.GetPodByUID(uid)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return !kubepod.IsMirrorPod(pod) && m.podDeletionSafety.OkToDeletePod(pod)
|
||||||
|
}
|
||||||
|
|
||||||
// needsReconcile compares the given status with the status in the pod manager (which
|
// needsReconcile compares the given status with the status in the pod manager (which
|
||||||
// in fact comes from apiserver), returns whether the status needs to be reconciled with
|
// in fact comes from apiserver), returns whether the status needs to be reconciled with
|
||||||
// the apiserver. Now when pod status is inconsistent between apiserver and kubelet,
|
// the apiserver. Now when pod status is inconsistent between apiserver and kubelet,
|
||||||
|
@ -563,17 +576,6 @@ func normalizeStatus(pod *v1.Pod, status *v1.PodStatus) *v1.PodStatus {
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
|
|
||||||
// notRunning returns true if every status is terminated or waiting, or the status list
|
|
||||||
// is empty.
|
|
||||||
func notRunning(statuses []v1.ContainerStatus) bool {
|
|
||||||
for _, status := range statuses {
|
|
||||||
if status.State.Terminated == nil && status.State.Waiting == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func copyStatus(source *v1.PodStatus) (v1.PodStatus, error) {
|
func copyStatus(source *v1.PodStatus) (v1.PodStatus, error) {
|
||||||
clone, err := api.Scheme.DeepCopy(source)
|
clone, err := api.Scheme.DeepCopy(source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -39,6 +39,7 @@ import (
|
||||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||||
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
|
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
|
||||||
kubesecret "k8s.io/kubernetes/pkg/kubelet/secret"
|
kubesecret "k8s.io/kubernetes/pkg/kubelet/secret"
|
||||||
|
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
|
||||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -74,7 +75,7 @@ func (m *manager) testSyncBatch() {
|
||||||
func newTestManager(kubeClient clientset.Interface) *manager {
|
func newTestManager(kubeClient clientset.Interface) *manager {
|
||||||
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager())
|
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), kubesecret.NewFakeManager())
|
||||||
podManager.AddPod(getTestPod())
|
podManager.AddPod(getTestPod())
|
||||||
return NewManager(kubeClient, podManager).(*manager)
|
return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}).(*manager)
|
||||||
}
|
}
|
||||||
|
|
||||||
func generateRandomMessage() string {
|
func generateRandomMessage() string {
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
package(default_visibility = ["//visibility:public"])
|
||||||
|
|
||||||
|
licenses(["notice"])
|
||||||
|
|
||||||
|
load(
|
||||||
|
"@io_bazel_rules_go//go:def.bzl",
|
||||||
|
"go_library",
|
||||||
|
)
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = ["fake_pod_deletion_safety.go"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
deps = [
|
||||||
|
"//pkg/api/v1:go_default_library",
|
||||||
|
"//pkg/kubelet/pod:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "package-srcs",
|
||||||
|
srcs = glob(["**"]),
|
||||||
|
tags = ["automanaged"],
|
||||||
|
visibility = ["//visibility:private"],
|
||||||
|
)
|
||||||
|
|
||||||
|
filegroup(
|
||||||
|
name = "all-srcs",
|
||||||
|
srcs = [":package-srcs"],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
)
|
|
@ -0,0 +1,28 @@
|
||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package testing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
|
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FakePodDeletionSafetyProvider struct{}
|
||||||
|
|
||||||
|
func (f *FakePodDeletionSafetyProvider) OkToDeletePod(pod *v1.Pod) bool {
|
||||||
|
return !kubepod.IsMirrorPod(pod) && pod.DeletionTimestamp != nil
|
||||||
|
}
|
|
@ -18,6 +18,7 @@ go_library(
|
||||||
"//pkg/kubelet/config:go_default_library",
|
"//pkg/kubelet/config:go_default_library",
|
||||||
"//pkg/kubelet/container:go_default_library",
|
"//pkg/kubelet/container:go_default_library",
|
||||||
"//pkg/kubelet/pod:go_default_library",
|
"//pkg/kubelet/pod:go_default_library",
|
||||||
|
"//pkg/kubelet/status:go_default_library",
|
||||||
"//pkg/kubelet/util/format:go_default_library",
|
"//pkg/kubelet/util/format:go_default_library",
|
||||||
"//pkg/kubelet/volumemanager/cache:go_default_library",
|
"//pkg/kubelet/volumemanager/cache:go_default_library",
|
||||||
"//pkg/kubelet/volumemanager/populator:go_default_library",
|
"//pkg/kubelet/volumemanager/populator:go_default_library",
|
||||||
|
@ -50,6 +51,8 @@ go_test(
|
||||||
"//pkg/kubelet/pod:go_default_library",
|
"//pkg/kubelet/pod:go_default_library",
|
||||||
"//pkg/kubelet/pod/testing:go_default_library",
|
"//pkg/kubelet/pod/testing:go_default_library",
|
||||||
"//pkg/kubelet/secret:go_default_library",
|
"//pkg/kubelet/secret:go_default_library",
|
||||||
|
"//pkg/kubelet/status:go_default_library",
|
||||||
|
"//pkg/kubelet/status/testing:go_default_library",
|
||||||
"//pkg/util/mount:go_default_library",
|
"//pkg/util/mount:go_default_library",
|
||||||
"//pkg/volume:go_default_library",
|
"//pkg/volume:go_default_library",
|
||||||
"//pkg/volume/testing:go_default_library",
|
"//pkg/volume/testing:go_default_library",
|
||||||
|
|
|
@ -17,6 +17,7 @@ go_library(
|
||||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||||
"//pkg/kubelet/container:go_default_library",
|
"//pkg/kubelet/container:go_default_library",
|
||||||
"//pkg/kubelet/pod:go_default_library",
|
"//pkg/kubelet/pod:go_default_library",
|
||||||
|
"//pkg/kubelet/status:go_default_library",
|
||||||
"//pkg/kubelet/util/format:go_default_library",
|
"//pkg/kubelet/util/format:go_default_library",
|
||||||
"//pkg/kubelet/volumemanager/cache:go_default_library",
|
"//pkg/kubelet/volumemanager/cache:go_default_library",
|
||||||
"//pkg/volume:go_default_library",
|
"//pkg/volume:go_default_library",
|
||||||
|
|
|
@ -35,6 +35,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/pod"
|
"k8s.io/kubernetes/pkg/kubelet/pod"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
|
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
|
@ -70,6 +71,7 @@ func NewDesiredStateOfWorldPopulator(
|
||||||
loopSleepDuration time.Duration,
|
loopSleepDuration time.Duration,
|
||||||
getPodStatusRetryDuration time.Duration,
|
getPodStatusRetryDuration time.Duration,
|
||||||
podManager pod.Manager,
|
podManager pod.Manager,
|
||||||
|
podStatusProvider status.PodStatusProvider,
|
||||||
desiredStateOfWorld cache.DesiredStateOfWorld,
|
desiredStateOfWorld cache.DesiredStateOfWorld,
|
||||||
kubeContainerRuntime kubecontainer.Runtime,
|
kubeContainerRuntime kubecontainer.Runtime,
|
||||||
keepTerminatedPodVolumes bool) DesiredStateOfWorldPopulator {
|
keepTerminatedPodVolumes bool) DesiredStateOfWorldPopulator {
|
||||||
|
@ -78,6 +80,7 @@ func NewDesiredStateOfWorldPopulator(
|
||||||
loopSleepDuration: loopSleepDuration,
|
loopSleepDuration: loopSleepDuration,
|
||||||
getPodStatusRetryDuration: getPodStatusRetryDuration,
|
getPodStatusRetryDuration: getPodStatusRetryDuration,
|
||||||
podManager: podManager,
|
podManager: podManager,
|
||||||
|
podStatusProvider: podStatusProvider,
|
||||||
desiredStateOfWorld: desiredStateOfWorld,
|
desiredStateOfWorld: desiredStateOfWorld,
|
||||||
pods: processedPods{
|
pods: processedPods{
|
||||||
processedPods: make(map[volumetypes.UniquePodName]bool)},
|
processedPods: make(map[volumetypes.UniquePodName]bool)},
|
||||||
|
@ -91,6 +94,7 @@ type desiredStateOfWorldPopulator struct {
|
||||||
loopSleepDuration time.Duration
|
loopSleepDuration time.Duration
|
||||||
getPodStatusRetryDuration time.Duration
|
getPodStatusRetryDuration time.Duration
|
||||||
podManager pod.Manager
|
podManager pod.Manager
|
||||||
|
podStatusProvider status.PodStatusProvider
|
||||||
desiredStateOfWorld cache.DesiredStateOfWorld
|
desiredStateOfWorld cache.DesiredStateOfWorld
|
||||||
pods processedPods
|
pods processedPods
|
||||||
kubeContainerRuntime kubecontainer.Runtime
|
kubeContainerRuntime kubecontainer.Runtime
|
||||||
|
@ -134,15 +138,30 @@ func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func isPodTerminated(pod *v1.Pod) bool {
|
func (dswp *desiredStateOfWorldPopulator) isPodTerminated(pod *v1.Pod) bool {
|
||||||
return pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded
|
podStatus, found := dswp.podStatusProvider.GetPodStatus(pod.UID)
|
||||||
|
if !found {
|
||||||
|
podStatus = pod.Status
|
||||||
|
}
|
||||||
|
return podStatus.Phase == v1.PodFailed || podStatus.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(podStatus.ContainerStatuses))
|
||||||
|
}
|
||||||
|
|
||||||
|
// notRunning returns true if every status is terminated or waiting, or the status list
|
||||||
|
// is empty.
|
||||||
|
func notRunning(statuses []v1.ContainerStatus) bool {
|
||||||
|
for _, status := range statuses {
|
||||||
|
if status.State.Terminated == nil && status.State.Waiting == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate through all pods and add to desired state of world if they don't
|
// Iterate through all pods and add to desired state of world if they don't
|
||||||
// exist but should
|
// exist but should
|
||||||
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
|
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
|
||||||
for _, pod := range dswp.podManager.GetPods() {
|
for _, pod := range dswp.podManager.GetPods() {
|
||||||
if isPodTerminated(pod) {
|
if dswp.isPodTerminated(pod) {
|
||||||
// Do not (re)add volumes for terminated pods
|
// Do not (re)add volumes for terminated pods
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -160,7 +179,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
|
||||||
pod, podExists := dswp.podManager.GetPodByUID(volumeToMount.Pod.UID)
|
pod, podExists := dswp.podManager.GetPodByUID(volumeToMount.Pod.UID)
|
||||||
if podExists {
|
if podExists {
|
||||||
// Skip running pods
|
// Skip running pods
|
||||||
if !isPodTerminated(pod) {
|
if !dswp.isPodTerminated(pod) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if dswp.keepTerminatedPodVolumes {
|
if dswp.keepTerminatedPodVolumes {
|
||||||
|
|
|
@ -33,6 +33,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/kubelet/container"
|
"k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/pod"
|
"k8s.io/kubernetes/pkg/kubelet/pod"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
|
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/volumemanager/populator"
|
"k8s.io/kubernetes/pkg/kubelet/volumemanager/populator"
|
||||||
|
@ -151,6 +152,7 @@ func NewVolumeManager(
|
||||||
controllerAttachDetachEnabled bool,
|
controllerAttachDetachEnabled bool,
|
||||||
nodeName k8stypes.NodeName,
|
nodeName k8stypes.NodeName,
|
||||||
podManager pod.Manager,
|
podManager pod.Manager,
|
||||||
|
podStatusProvider status.PodStatusProvider,
|
||||||
kubeClient clientset.Interface,
|
kubeClient clientset.Interface,
|
||||||
volumePluginMgr *volume.VolumePluginMgr,
|
volumePluginMgr *volume.VolumePluginMgr,
|
||||||
kubeContainerRuntime kubecontainer.Runtime,
|
kubeContainerRuntime kubecontainer.Runtime,
|
||||||
|
@ -191,6 +193,7 @@ func NewVolumeManager(
|
||||||
desiredStateOfWorldPopulatorLoopSleepPeriod,
|
desiredStateOfWorldPopulatorLoopSleepPeriod,
|
||||||
desiredStateOfWorldPopulatorGetPodStatusRetryDuration,
|
desiredStateOfWorldPopulatorGetPodStatusRetryDuration,
|
||||||
podManager,
|
podManager,
|
||||||
|
podStatusProvider,
|
||||||
vm.desiredStateOfWorld,
|
vm.desiredStateOfWorld,
|
||||||
kubeContainerRuntime,
|
kubeContainerRuntime,
|
||||||
keepTerminatedPodVolumes)
|
keepTerminatedPodVolumes)
|
||||||
|
|
|
@ -36,6 +36,8 @@ import (
|
||||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||||
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
|
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/secret"
|
"k8s.io/kubernetes/pkg/kubelet/secret"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||||
|
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
|
||||||
"k8s.io/kubernetes/pkg/util/mount"
|
"k8s.io/kubernetes/pkg/util/mount"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
volumetest "k8s.io/kubernetes/pkg/volume/testing"
|
||||||
|
@ -187,11 +189,13 @@ func newTestVolumeManager(
|
||||||
fakeRecorder := &record.FakeRecorder{}
|
fakeRecorder := &record.FakeRecorder{}
|
||||||
plugMgr := &volume.VolumePluginMgr{}
|
plugMgr := &volume.VolumePluginMgr{}
|
||||||
plugMgr.InitPlugins([]volume.VolumePlugin{plug}, volumetest.NewFakeVolumeHost(tmpDir, kubeClient, nil))
|
plugMgr.InitPlugins([]volume.VolumePlugin{plug}, volumetest.NewFakeVolumeHost(tmpDir, kubeClient, nil))
|
||||||
|
statusManager := status.NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{})
|
||||||
|
|
||||||
vm, err := NewVolumeManager(
|
vm, err := NewVolumeManager(
|
||||||
true,
|
true,
|
||||||
testHostname,
|
testHostname,
|
||||||
podManager,
|
podManager,
|
||||||
|
statusManager,
|
||||||
kubeClient,
|
kubeClient,
|
||||||
plugMgr,
|
plugMgr,
|
||||||
&containertest.FakeRuntime{},
|
&containertest.FakeRuntime{},
|
||||||
|
|
Loading…
Reference in New Issue