mirror of https://github.com/k3s-io/k3s
Namespace must estimate and requeue content deletions
parent
61c7beb51f
commit
62b6ca1643
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||||
package namespacecontroller
|
package namespacecontroller
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
@ -41,7 +42,8 @@ type NamespaceController struct {
|
||||||
|
|
||||||
// NewNamespaceController creates a new NamespaceController
|
// NewNamespaceController creates a new NamespaceController
|
||||||
func NewNamespaceController(kubeClient client.Interface, resyncPeriod time.Duration) *NamespaceController {
|
func NewNamespaceController(kubeClient client.Interface, resyncPeriod time.Duration) *NamespaceController {
|
||||||
_, controller := framework.NewInformer(
|
var controller *framework.Controller
|
||||||
|
_, controller = framework.NewInformer(
|
||||||
&cache.ListWatch{
|
&cache.ListWatch{
|
||||||
ListFunc: func() (runtime.Object, error) {
|
ListFunc: func() (runtime.Object, error) {
|
||||||
return kubeClient.Namespaces().List(labels.Everything(), fields.Everything())
|
return kubeClient.Namespaces().List(labels.Everything(), fields.Everything())
|
||||||
|
@ -55,16 +57,41 @@ func NewNamespaceController(kubeClient client.Interface, resyncPeriod time.Durat
|
||||||
framework.ResourceEventHandlerFuncs{
|
framework.ResourceEventHandlerFuncs{
|
||||||
AddFunc: func(obj interface{}) {
|
AddFunc: func(obj interface{}) {
|
||||||
namespace := obj.(*api.Namespace)
|
namespace := obj.(*api.Namespace)
|
||||||
err := syncNamespace(kubeClient, *namespace)
|
if err := syncNamespace(kubeClient, *namespace); err != nil {
|
||||||
if err != nil {
|
if estimate, ok := err.(*contentRemainingError); ok {
|
||||||
glog.Error(err)
|
go func() {
|
||||||
|
// Estimate is the aggregate total of TerminationGracePeriodSeconds, which defaults to 30s
|
||||||
|
// for pods. However, most processes will terminate faster - within a few seconds, probably
|
||||||
|
// with a peak within 5-10s. So this division is a heuristic that avoids waiting the full
|
||||||
|
// duration when in many cases things complete more quickly. The extra second added is to
|
||||||
|
// ensure we never wait 0 seconds.
|
||||||
|
t := estimate.Estimate/2 + 1
|
||||||
|
glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", namespace.Name, t)
|
||||||
|
time.Sleep(time.Duration(t) * time.Second)
|
||||||
|
if err := controller.Requeue(namespace); err != nil {
|
||||||
|
util.HandleError(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
util.HandleError(err)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||||
namespace := newObj.(*api.Namespace)
|
namespace := newObj.(*api.Namespace)
|
||||||
err := syncNamespace(kubeClient, *namespace)
|
if err := syncNamespace(kubeClient, *namespace); err != nil {
|
||||||
if err != nil {
|
if estimate, ok := err.(*contentRemainingError); ok {
|
||||||
glog.Error(err)
|
go func() {
|
||||||
|
t := estimate.Estimate/2 + 1
|
||||||
|
glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", namespace.Name, t)
|
||||||
|
time.Sleep(time.Duration(t) * time.Second)
|
||||||
|
if err := controller.Requeue(namespace); err != nil {
|
||||||
|
util.HandleError(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
util.HandleError(err)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -114,45 +141,56 @@ func finalize(kubeClient client.Interface, namespace api.Namespace) (*api.Namesp
|
||||||
return kubeClient.Namespaces().Finalize(&namespaceFinalize)
|
return kubeClient.Namespaces().Finalize(&namespaceFinalize)
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteAllContent will delete all content known to the system in a namespace
|
type contentRemainingError struct {
|
||||||
func deleteAllContent(kubeClient client.Interface, namespace string) (err error) {
|
Estimate int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *contentRemainingError) Error() string {
|
||||||
|
return fmt.Sprintf("some content remains in the namespace, estimate %d seconds before it is removed", e.Estimate)
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteAllContent will delete all content known to the system in a namespace. It returns an estimate
|
||||||
|
// of the time remaining before the remaining resources are deleted. If estimate > 0 not all resources
|
||||||
|
// are guaranteed to be gone.
|
||||||
|
func deleteAllContent(kubeClient client.Interface, namespace string, before util.Time) (estimate int64, err error) {
|
||||||
err = deleteServiceAccounts(kubeClient, namespace)
|
err = deleteServiceAccounts(kubeClient, namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return estimate, err
|
||||||
}
|
}
|
||||||
err = deleteServices(kubeClient, namespace)
|
err = deleteServices(kubeClient, namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return estimate, err
|
||||||
}
|
}
|
||||||
err = deleteReplicationControllers(kubeClient, namespace)
|
err = deleteReplicationControllers(kubeClient, namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return estimate, err
|
||||||
}
|
}
|
||||||
err = deletePods(kubeClient, namespace)
|
estimate, err = deletePods(kubeClient, namespace, before)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return estimate, err
|
||||||
}
|
}
|
||||||
err = deleteSecrets(kubeClient, namespace)
|
err = deleteSecrets(kubeClient, namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return estimate, err
|
||||||
}
|
}
|
||||||
err = deletePersistentVolumeClaims(kubeClient, namespace)
|
err = deletePersistentVolumeClaims(kubeClient, namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return estimate, err
|
||||||
}
|
}
|
||||||
err = deleteLimitRanges(kubeClient, namespace)
|
err = deleteLimitRanges(kubeClient, namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return estimate, err
|
||||||
}
|
}
|
||||||
err = deleteResourceQuotas(kubeClient, namespace)
|
err = deleteResourceQuotas(kubeClient, namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return estimate, err
|
||||||
}
|
}
|
||||||
err = deleteEvents(kubeClient, namespace)
|
err = deleteEvents(kubeClient, namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return estimate, err
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
|
return estimate, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncNamespace makes namespace life-cycle decisions
|
// syncNamespace makes namespace life-cycle decisions
|
||||||
|
@ -160,6 +198,7 @@ func syncNamespace(kubeClient client.Interface, namespace api.Namespace) (err er
|
||||||
if namespace.DeletionTimestamp == nil {
|
if namespace.DeletionTimestamp == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
glog.V(4).Infof("Syncing namespace %s", namespace.Name)
|
||||||
|
|
||||||
// if there is a deletion timestamp, and the status is not terminating, then update status
|
// if there is a deletion timestamp, and the status is not terminating, then update status
|
||||||
if !namespace.DeletionTimestamp.IsZero() && namespace.Status.Phase != api.NamespaceTerminating {
|
if !namespace.DeletionTimestamp.IsZero() && namespace.Status.Phase != api.NamespaceTerminating {
|
||||||
|
@ -185,10 +224,13 @@ func syncNamespace(kubeClient client.Interface, namespace api.Namespace) (err er
|
||||||
}
|
}
|
||||||
|
|
||||||
// there may still be content for us to remove
|
// there may still be content for us to remove
|
||||||
err = deleteAllContent(kubeClient, namespace.Name)
|
estimate, err := deleteAllContent(kubeClient, namespace.Name, *namespace.DeletionTimestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if estimate > 0 {
|
||||||
|
return &contentRemainingError{estimate}
|
||||||
|
}
|
||||||
|
|
||||||
// we have removed content, so mark it finalized by us
|
// we have removed content, so mark it finalized by us
|
||||||
result, err := finalize(kubeClient, namespace)
|
result, err := finalize(kubeClient, namespace)
|
||||||
|
@ -277,18 +319,33 @@ func deleteReplicationControllers(kubeClient client.Interface, ns string) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func deletePods(kubeClient client.Interface, ns string) error {
|
func deletePods(kubeClient client.Interface, ns string, before util.Time) (int64, error) {
|
||||||
items, err := kubeClient.Pods(ns).List(labels.Everything(), fields.Everything())
|
items, err := kubeClient.Pods(ns).List(labels.Everything(), fields.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
expired := util.Now().After(before.Time)
|
||||||
|
var deleteOptions *api.DeleteOptions
|
||||||
|
if expired {
|
||||||
|
deleteOptions = api.NewDeleteOptions(0)
|
||||||
|
}
|
||||||
|
estimate := int64(0)
|
||||||
for i := range items.Items {
|
for i := range items.Items {
|
||||||
err := kubeClient.Pods(ns).Delete(items.Items[i].Name, nil)
|
if items.Items[i].Spec.TerminationGracePeriodSeconds != nil {
|
||||||
|
grace := *items.Items[i].Spec.TerminationGracePeriodSeconds
|
||||||
|
if grace > estimate {
|
||||||
|
estimate = grace
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err := kubeClient.Pods(ns).Delete(items.Items[i].Name, deleteOptions)
|
||||||
if err != nil && !errors.IsNotFound(err) {
|
if err != nil && !errors.IsNotFound(err) {
|
||||||
return err
|
return 0, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
if expired {
|
||||||
|
estimate = 0
|
||||||
|
}
|
||||||
|
return estimate, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteEvents(kubeClient client.Interface, ns string) error {
|
func deleteEvents(kubeClient client.Interface, ns string) error {
|
||||||
|
|
Loading…
Reference in New Issue