mirror of https://github.com/k3s-io/k3s
detach the volume when pod is terminated
Make sure volume is detached when pod is terminated because of any reason and not deleted from api server.pull/6/head
parent
7408f6b3a7
commit
9a1a9cbe08
|
@ -569,8 +569,7 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
|
|||
cloud,
|
||||
ProbeAttachableVolumePlugins(s.VolumeConfiguration),
|
||||
s.DisableAttachDetachReconcilerSync,
|
||||
s.ReconcilerSyncLoopPeriod.Duration,
|
||||
)
|
||||
s.ReconcilerSyncLoopPeriod.Duration)
|
||||
if attachDetachControllerErr != nil {
|
||||
return fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr)
|
||||
}
|
||||
|
|
|
@ -208,6 +208,7 @@ CPU_CFS_QUOTA=${CPU_CFS_QUOTA:-true}
|
|||
ENABLE_HOSTPATH_PROVISIONER=${ENABLE_HOSTPATH_PROVISIONER:-"false"}
|
||||
CLAIM_BINDER_SYNC_PERIOD=${CLAIM_BINDER_SYNC_PERIOD:-"15s"} # current k8s default
|
||||
ENABLE_CONTROLLER_ATTACH_DETACH=${ENABLE_CONTROLLER_ATTACH_DETACH:-"true"} # current default
|
||||
KEEP_TERMINATED_POD_VOLUMES=${KEEP_TERMINATED_POD_VOLUMES:-"true"}
|
||||
# This is the default dir and filename where the apiserver will generate a self-signed cert
|
||||
# which should be able to be used as the CA to verify itself
|
||||
CERT_DIR=${CERT_DIR:-"/var/run/kubernetes"}
|
||||
|
@ -638,7 +639,7 @@ function start_kubelet {
|
|||
--enable-controller-attach-detach="${ENABLE_CONTROLLER_ATTACH_DETACH}" \
|
||||
--cgroups-per-qos=${CGROUPS_PER_QOS} \
|
||||
--cgroup-driver=${CGROUP_DRIVER} \
|
||||
--keep-terminated-pod-volumes=true \
|
||||
--keep-terminated-pod-volumes=${KEEP_TERMINATED_POD_VOLUMES} \
|
||||
--eviction-hard=${EVICTION_HARD} \
|
||||
--eviction-soft=${EVICTION_SOFT} \
|
||||
--eviction-pressure-transition-period=${EVICTION_PRESSURE_TRANSITION_PERIOD} \
|
||||
|
|
|
@ -385,8 +385,13 @@ func (adc *attachDetachController) podAdd(obj interface{}) {
|
|||
return
|
||||
}
|
||||
|
||||
util.ProcessPodVolumes(pod, true, /* addVolumes */
|
||||
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
|
||||
if volumehelper.IsPodTerminated(pod, pod.Status) {
|
||||
util.ProcessPodVolumes(pod, false, /* addVolumes */
|
||||
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
|
||||
} else {
|
||||
util.ProcessPodVolumes(pod, true, /* addVolumes */
|
||||
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
|
||||
}
|
||||
}
|
||||
|
||||
// GetDesiredStateOfWorld returns desired state of world associated with controller
|
||||
|
@ -395,8 +400,23 @@ func (adc *attachDetachController) GetDesiredStateOfWorld() cache.DesiredStateOf
|
|||
}
|
||||
|
||||
func (adc *attachDetachController) podUpdate(oldObj, newObj interface{}) {
|
||||
// The flow for update is the same as add.
|
||||
adc.podAdd(newObj)
|
||||
pod, ok := newObj.(*v1.Pod)
|
||||
if pod == nil || !ok {
|
||||
return
|
||||
}
|
||||
if pod.Spec.NodeName == "" {
|
||||
// Ignore pods without NodeName, indicating they are not scheduled.
|
||||
return
|
||||
}
|
||||
|
||||
addPodFlag := true
|
||||
|
||||
if volumehelper.IsPodTerminated(pod, pod.Status) {
|
||||
addPodFlag = false
|
||||
}
|
||||
|
||||
util.ProcessPodVolumes(pod, addPodFlag, /* addVolumes */
|
||||
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
|
||||
}
|
||||
|
||||
func (adc *attachDetachController) podDelete(obj interface{}) {
|
||||
|
|
|
@ -213,6 +213,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
|
|||
plugins,
|
||||
false,
|
||||
time.Second*1)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
|
||||
}
|
||||
|
|
|
@ -127,11 +127,13 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
|
|||
glog.Errorf("podLister Get failed for pod %q (UID %q) with %v", dswPodKey, dswPodUID, err)
|
||||
continue
|
||||
default:
|
||||
informerPodUID := volumehelper.GetUniquePodName(informerPod)
|
||||
// Check whether the unique identifier of the pod from dsw matches the one retrieved from pod informer
|
||||
if informerPodUID == dswPodUID {
|
||||
glog.V(10).Infof("Verified pod %q (UID %q) from dsw exists in pod informer.", dswPodKey, dswPodUID)
|
||||
continue
|
||||
if !volumehelper.IsPodTerminated(informerPod, informerPod.Status) {
|
||||
informerPodUID := volumehelper.GetUniquePodName(informerPod)
|
||||
// Check whether the unique identifier of the pod from dsw matches the one retrieved from pod informer
|
||||
if informerPodUID == dswPodUID {
|
||||
glog.V(10).Infof("Verified pod %q (UID %q) from dsw exists in pod informer.", dswPodKey, dswPodUID)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -149,6 +149,77 @@ func TestPodDeletionWithDswp(t *testing.T) {
|
|||
close(stopCh)
|
||||
}
|
||||
|
||||
func TestPodUpdateWithWithADC(t *testing.T) {
|
||||
_, server := framework.RunAMaster(nil)
|
||||
defer server.Close()
|
||||
namespaceName := "test-pod-update"
|
||||
|
||||
node := &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node-sandbox",
|
||||
Annotations: map[string]string{
|
||||
volumehelper.ControllerManagedAttachAnnotation: "true",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
ns := framework.CreateTestingNamespace(namespaceName, server, t)
|
||||
defer framework.DeleteTestingNamespace(ns, server, t)
|
||||
|
||||
testClient, ctrl, informers := createAdClients(ns, t, server, defaultSyncPeriod)
|
||||
|
||||
pod := fakePodWithVol(namespaceName)
|
||||
podStopCh := make(chan struct{})
|
||||
|
||||
if _, err := testClient.Core().Nodes().Create(node); err != nil {
|
||||
t.Fatalf("Failed to created node : %v", err)
|
||||
}
|
||||
|
||||
go informers.Core().V1().Nodes().Informer().Run(podStopCh)
|
||||
|
||||
if _, err := testClient.Core().Pods(ns.Name).Create(pod); err != nil {
|
||||
t.Errorf("Failed to create pod : %v", err)
|
||||
}
|
||||
|
||||
podInformer := informers.Core().V1().Pods().Informer()
|
||||
go podInformer.Run(podStopCh)
|
||||
|
||||
// start controller loop
|
||||
stopCh := make(chan struct{})
|
||||
go informers.Core().V1().PersistentVolumeClaims().Informer().Run(stopCh)
|
||||
go informers.Core().V1().PersistentVolumes().Informer().Run(stopCh)
|
||||
go ctrl.Run(stopCh)
|
||||
|
||||
waitToObservePods(t, podInformer, 1)
|
||||
podKey, err := cache.MetaNamespaceKeyFunc(pod)
|
||||
if err != nil {
|
||||
t.Fatalf("MetaNamespaceKeyFunc failed with : %v", err)
|
||||
}
|
||||
|
||||
_, _, err = podInformer.GetStore().GetByKey(podKey)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Pod not found in Pod Informer cache : %v", err)
|
||||
}
|
||||
|
||||
waitForPodsInDSWP(t, ctrl.GetDesiredStateOfWorld())
|
||||
|
||||
pod.Status.Phase = v1.PodSucceeded
|
||||
|
||||
if _, err := testClient.Core().Pods(ns.Name).UpdateStatus(pod); err != nil {
|
||||
t.Errorf("Failed to update pod : %v", err)
|
||||
}
|
||||
|
||||
time.Sleep(20 * time.Second)
|
||||
podsToAdd := ctrl.GetDesiredStateOfWorld().GetPodToAdd()
|
||||
if len(podsToAdd) != 0 {
|
||||
t.Fatalf("All pods should have been removed")
|
||||
}
|
||||
|
||||
close(podStopCh)
|
||||
close(stopCh)
|
||||
}
|
||||
|
||||
// wait for the podInformer to observe the pods. Call this function before
|
||||
// running the RC manager to prevent the rc manager from creating new pods
|
||||
// rather than adopting the existing ones.
|
||||
|
@ -213,8 +284,8 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy
|
|||
cloud,
|
||||
plugins,
|
||||
false,
|
||||
time.Second*5,
|
||||
)
|
||||
time.Second*5)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating AttachDetach : %v", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue