Merge pull request #20076 from derekwaynecarr/namespace_controller_workers

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2016-02-10 00:55:57 -08:00
commit 71b6b81102
8 changed files with 468 additions and 549 deletions

View File

@ -241,7 +241,8 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
glog.Fatalf("Failed to get supported resources from server: %v", err) glog.Fatalf("Failed to get supported resources from server: %v", err)
} }
namespacecontroller.NewNamespaceController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "namespace-controller")), versions, s.NamespaceSyncPeriod).Run() namespaceController := namespacecontroller.NewNamespaceController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "namespace-controller")), &unversioned.APIVersions{}, s.NamespaceSyncPeriod)
go namespaceController.Run(s.ConcurrentNamespaceSyncs, wait.NeverStop)
groupVersion := "extensions/v1beta1" groupVersion := "extensions/v1beta1"
resources, found := resourceMap[groupVersion] resources, found := resourceMap[groupVersion]

View File

@ -44,6 +44,7 @@ type CMServer struct {
ConcurrentJobSyncs int ConcurrentJobSyncs int
ConcurrentResourceQuotaSyncs int ConcurrentResourceQuotaSyncs int
ConcurrentDeploymentSyncs int ConcurrentDeploymentSyncs int
ConcurrentNamespaceSyncs int
ServiceSyncPeriod time.Duration ServiceSyncPeriod time.Duration
NodeSyncPeriod time.Duration NodeSyncPeriod time.Duration
ResourceQuotaSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration
@ -105,6 +106,7 @@ func NewCMServer() *CMServer {
ConcurrentJobSyncs: 5, ConcurrentJobSyncs: 5,
ConcurrentResourceQuotaSyncs: 5, ConcurrentResourceQuotaSyncs: 5,
ConcurrentDeploymentSyncs: 5, ConcurrentDeploymentSyncs: 5,
ConcurrentNamespaceSyncs: 2,
ServiceSyncPeriod: 5 * time.Minute, ServiceSyncPeriod: 5 * time.Minute,
NodeSyncPeriod: 10 * time.Second, NodeSyncPeriod: 10 * time.Second,
ResourceQuotaSyncPeriod: 5 * time.Minute, ResourceQuotaSyncPeriod: 5 * time.Minute,
@ -143,10 +145,11 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.CloudProvider, "cloud-provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.") fs.StringVar(&s.CloudProvider, "cloud-provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.")
fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")
fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent-endpoint-syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent-endpoint-syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
fs.IntVar(&s.ConcurrentRCSyncs, "concurrent_rc_syncs", s.ConcurrentRCSyncs, "The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load") fs.IntVar(&s.ConcurrentRCSyncs, "concurrent_rc_syncs", s.ConcurrentRCSyncs, "The number of replication controllers that are allowed to sync concurrently. Larger number = more responsive replica management, but more CPU (and network) load")
fs.IntVar(&s.ConcurrentRSSyncs, "concurrent-replicaset-syncs", s.ConcurrentRSSyncs, "The number of replica sets that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load") fs.IntVar(&s.ConcurrentRSSyncs, "concurrent-replicaset-syncs", s.ConcurrentRSSyncs, "The number of replica sets that are allowed to sync concurrently. Larger number = more responsive replica management, but more CPU (and network) load")
fs.IntVar(&s.ConcurrentResourceQuotaSyncs, "concurrent-resource-quota-syncs", s.ConcurrentResourceQuotaSyncs, "The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load") fs.IntVar(&s.ConcurrentResourceQuotaSyncs, "concurrent-resource-quota-syncs", s.ConcurrentResourceQuotaSyncs, "The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load")
fs.IntVar(&s.ConcurrentDeploymentSyncs, "concurrent-deployment-syncs", s.ConcurrentDeploymentSyncs, "The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load") fs.IntVar(&s.ConcurrentDeploymentSyncs, "concurrent-deployment-syncs", s.ConcurrentDeploymentSyncs, "The number of deployment objects that are allowed to sync concurrently. Larger number = more responsive deployments, but more CPU (and network) load")
fs.IntVar(&s.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", s.ConcurrentNamespaceSyncs, "The number of namespace objects that are allowed to sync concurrently. Larger number = more responsive namespace termination, but more CPU (and network) load")
fs.DurationVar(&s.ServiceSyncPeriod, "service-sync-period", s.ServiceSyncPeriod, "The period for syncing services with their external load balancers") fs.DurationVar(&s.ServiceSyncPeriod, "service-sync-period", s.ServiceSyncPeriod, "The period for syncing services with their external load balancers")
fs.DurationVar(&s.NodeSyncPeriod, "node-sync-period", s.NodeSyncPeriod, ""+ fs.DurationVar(&s.NodeSyncPeriod, "node-sync-period", s.NodeSyncPeriod, ""+
"The period for syncing nodes from cloudprovider. Longer periods will result in "+ "The period for syncing nodes from cloudprovider. Longer periods will result in "+

View File

@ -197,7 +197,7 @@ func (s *CMServer) Run(_ []string) error {
} }
namespaceController := namespacecontroller.NewNamespaceController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "namespace-controller")), &unversioned.APIVersions{}, s.NamespaceSyncPeriod) namespaceController := namespacecontroller.NewNamespaceController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "namespace-controller")), &unversioned.APIVersions{}, s.NamespaceSyncPeriod)
namespaceController.Run() namespaceController.Run(s.ConcurrentNamespaceSyncs, wait.NeverStop)
groupVersion := "extensions/v1beta1" groupVersion := "extensions/v1beta1"
resources, found := resourceMap[groupVersion] resources, found := resourceMap[groupVersion]

View File

@ -61,11 +61,12 @@ kube-controller-manager
--cloud-provider="": The provider for cloud services. Empty string for no provider. --cloud-provider="": The provider for cloud services. Empty string for no provider.
--cluster-cidr=<nil>: CIDR Range for Pods in cluster. --cluster-cidr=<nil>: CIDR Range for Pods in cluster.
--cluster-name="kubernetes": The instance prefix for the cluster --cluster-name="kubernetes": The instance prefix for the cluster
--concurrent-deployment-syncs=5: The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load --concurrent-deployment-syncs=5: The number of deployment objects that are allowed to sync concurrently. Larger number = more responsive deployments, but more CPU (and network) load
--concurrent-endpoint-syncs=5: The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load --concurrent-endpoint-syncs=5: The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load
--concurrent-replicaset-syncs=5: The number of replica sets that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load --concurrent-namespace-syncs=2: The number of namespace objects that are allowed to sync concurrently. Larger number = more responsive namespace termination, but more CPU (and network) load
--concurrent-replicaset-syncs=5: The number of replica sets that are allowed to sync concurrently. Larger number = more responsive replica management, but more CPU (and network) load
--concurrent-resource-quota-syncs=5: The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load --concurrent-resource-quota-syncs=5: The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load
--concurrent_rc_syncs=5: The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load --concurrent_rc_syncs=5: The number of replication controllers that are allowed to sync concurrently. Larger number = more responsive replica management, but more CPU (and network) load
--deleting-pods-burst=10: Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter. --deleting-pods-burst=10: Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.
--deleting-pods-qps=0.1: Number of nodes per second on which pods are deleted in case of node failure. --deleting-pods-qps=0.1: Number of nodes per second on which pods are deleted in case of node failure.
--deployment-controller-sync-period=30s: Period for syncing the deployments. --deployment-controller-sync-period=30s: Period for syncing the deployments.
@ -104,7 +105,7 @@ kube-controller-manager
--terminated-pod-gc-threshold=12500: Number of terminated pods that can exist before the terminated pod garbage collector starts deleting terminated pods. If <= 0, the terminated pod garbage collector is disabled. --terminated-pod-gc-threshold=12500: Number of terminated pods that can exist before the terminated pod garbage collector starts deleting terminated pods. If <= 0, the terminated pod garbage collector is disabled.
``` ```
###### Auto generated by spf13/cobra on 5-Feb-2016 ###### Auto generated by spf13/cobra on 8-Feb-2016
<!-- BEGIN MUNGE: GENERATED_ANALYTICS --> <!-- BEGIN MUNGE: GENERATED_ANALYTICS -->

View File

@ -49,6 +49,7 @@ concurrent-deployment-syncs
concurrent-endpoint-syncs concurrent-endpoint-syncs
concurrent-replicaset-syncs concurrent-replicaset-syncs
concurrent-resource-quota-syncs concurrent-resource-quota-syncs
concurrent-namespace-syncs
config-sync-period config-sync-period
configure-cbr0 configure-cbr0
conntrack-max conntrack-max

View File

@ -17,19 +17,18 @@ limitations under the License.
package namespace package namespace
import ( import (
"fmt"
"time" "time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
extensions_unversioned "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
@ -37,14 +36,29 @@ import (
// NamespaceController is responsible for performing actions dependent upon a namespace phase // NamespaceController is responsible for performing actions dependent upon a namespace phase
type NamespaceController struct { type NamespaceController struct {
// client that purges namespace content, must have list/delete privileges on all content
kubeClient clientset.Interface
// store that holds the namespaces
store cache.Store
// controller that observes the namespaces
controller *framework.Controller controller *framework.Controller
StopEverything chan struct{} // namespaces that have been queued up for processing by workers
queue *workqueue.Type
// list of versions to process
versions *unversioned.APIVersions
} }
// NewNamespaceController creates a new NamespaceController // NewNamespaceController creates a new NamespaceController
func NewNamespaceController(kubeClient clientset.Interface, versions *unversioned.APIVersions, resyncPeriod time.Duration) *NamespaceController { func NewNamespaceController(kubeClient clientset.Interface, versions *unversioned.APIVersions, resyncPeriod time.Duration) *NamespaceController {
var controller *framework.Controller // create the controller so we can inject the enqueue function
_, controller = framework.NewInformer( namespaceController := &NamespaceController{
kubeClient: kubeClient,
versions: versions,
queue: workqueue.New(),
}
// configure the backing store/controller
store, controller := framework.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) { ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.Core().Namespaces().List(options) return kubeClient.Core().Namespaces().List(options)
@ -54,530 +68,88 @@ func NewNamespaceController(kubeClient clientset.Interface, versions *unversione
}, },
}, },
&api.Namespace{}, &api.Namespace{},
// TODO: Can we have much longer period here?
resyncPeriod, resyncPeriod,
framework.ResourceEventHandlerFuncs{ framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
namespace := obj.(*api.Namespace) namespace := obj.(*api.Namespace)
if err := syncNamespace(kubeClient, versions, namespace); err != nil { namespaceController.enqueueNamespace(namespace)
if estimate, ok := err.(*contentRemainingError); ok {
go func() {
// Estimate is the aggregate total of TerminationGracePeriodSeconds, which defaults to 30s
// for pods. However, most processes will terminate faster - within a few seconds, probably
// with a peak within 5-10s. So this division is a heuristic that avoids waiting the full
// duration when in many cases things complete more quickly. The extra second added is to
// ensure we never wait 0 seconds.
t := estimate.Estimate/2 + 1
glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", namespace.Name, t)
time.Sleep(time.Duration(t) * time.Second)
if err := controller.Requeue(namespace); err != nil {
utilruntime.HandleError(err)
}
}()
return
}
utilruntime.HandleError(err)
}
}, },
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
namespace := newObj.(*api.Namespace) namespace := newObj.(*api.Namespace)
if err := syncNamespace(kubeClient, versions, namespace); err != nil { namespaceController.enqueueNamespace(namespace)
if estimate, ok := err.(*contentRemainingError); ok {
go func() {
t := estimate.Estimate/2 + 1
glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", namespace.Name, t)
time.Sleep(time.Duration(t) * time.Second)
if err := controller.Requeue(namespace); err != nil {
utilruntime.HandleError(err)
}
}()
return
}
utilruntime.HandleError(err)
}
}, },
}, },
) )
return &NamespaceController{ namespaceController.store = store
controller: controller, namespaceController.controller = controller
} return namespaceController
} }
// Run begins observing the system. It starts a goroutine and returns immediately. // enqueueNamespace adds an object to the controller work queue
func (nm *NamespaceController) Run() { // obj could be an *api.Namespace, or a DeletionFinalStateUnknown item.
if nm.StopEverything == nil { func (nm *NamespaceController) enqueueNamespace(obj interface{}) {
nm.StopEverything = make(chan struct{}) key, err := controller.KeyFunc(obj)
go nm.controller.Run(nm.StopEverything)
}
}
// Stop gracefully shutsdown this controller
func (nm *NamespaceController) Stop() {
if nm.StopEverything != nil {
close(nm.StopEverything)
nm.StopEverything = nil
}
}
// finalized returns true if the spec.finalizers is empty list
func finalized(namespace *api.Namespace) bool {
return len(namespace.Spec.Finalizers) == 0
}
// finalize will finalize the namespace for kubernetes
func finalizeNamespaceFunc(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) {
namespaceFinalize := api.Namespace{}
namespaceFinalize.ObjectMeta = namespace.ObjectMeta
namespaceFinalize.Spec = namespace.Spec
finalizerSet := sets.NewString()
for i := range namespace.Spec.Finalizers {
if namespace.Spec.Finalizers[i] != api.FinalizerKubernetes {
finalizerSet.Insert(string(namespace.Spec.Finalizers[i]))
}
}
namespaceFinalize.Spec.Finalizers = make([]api.FinalizerName, 0, len(finalizerSet))
for _, value := range finalizerSet.List() {
namespaceFinalize.Spec.Finalizers = append(namespaceFinalize.Spec.Finalizers, api.FinalizerName(value))
}
namespace, err := kubeClient.Core().Namespaces().Finalize(&namespaceFinalize)
if err != nil { if err != nil {
// it was removed already, so life is good glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
if errors.IsNotFound(err) {
return namespace, nil
}
}
return namespace, err
}
type contentRemainingError struct {
Estimate int64
}
func (e *contentRemainingError) Error() string {
return fmt.Sprintf("some content remains in the namespace, estimate %d seconds before it is removed", e.Estimate)
}
// deleteAllContent will delete all content known to the system in a namespace. It returns an estimate
// of the time remaining before the remaining resources are deleted. If estimate > 0 not all resources
// are guaranteed to be gone.
func deleteAllContent(kubeClient clientset.Interface, versions *unversioned.APIVersions, namespace string, before unversioned.Time) (estimate int64, err error) {
err = deleteServiceAccounts(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteServices(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteReplicationControllers(kubeClient, namespace)
if err != nil {
return estimate, err
}
estimate, err = deletePods(kubeClient, namespace, before)
if err != nil {
return estimate, err
}
err = deleteSecrets(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deletePersistentVolumeClaims(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteLimitRanges(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteResourceQuotas(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteEvents(kubeClient, namespace)
if err != nil {
return estimate, err
}
// If experimental mode, delete all experimental resources for the namespace.
if containsVersion(versions, "extensions/v1beta1") {
resources, err := kubeClient.Discovery().ServerResourcesForGroupVersion("extensions/v1beta1")
if err != nil {
return estimate, err
}
if containsResource(resources, "horizontalpodautoscalers") {
err = deleteHorizontalPodAutoscalers(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "ingresses") {
err = deleteIngress(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "daemonsets") {
err = deleteDaemonSets(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "jobs") {
err = deleteJobs(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "deployments") {
err = deleteDeployments(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "replicasets") {
err = deleteReplicaSets(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
}
return estimate, nil
}
// updateNamespaceFunc is a function that makes an update to a namespace
type updateNamespaceFunc func(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error)
// retryOnConflictError retries the specified fn if there was a conflict error
// TODO RetryOnConflict should be a generic concept in client code
func retryOnConflictError(kubeClient clientset.Interface, namespace *api.Namespace, fn updateNamespaceFunc) (result *api.Namespace, err error) {
latestNamespace := namespace
for {
result, err = fn(kubeClient, latestNamespace)
if err == nil {
return result, nil
}
if !errors.IsConflict(err) {
return nil, err
}
latestNamespace, err = kubeClient.Core().Namespaces().Get(latestNamespace.Name)
if err != nil {
return nil, err
}
}
return return
} }
nm.queue.Add(key)
// updateNamespaceStatusFunc will verify that the status of the namespace is correct
func updateNamespaceStatusFunc(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) {
if namespace.DeletionTimestamp.IsZero() || namespace.Status.Phase == api.NamespaceTerminating {
return namespace, nil
}
newNamespace := api.Namespace{}
newNamespace.ObjectMeta = namespace.ObjectMeta
newNamespace.Status = namespace.Status
newNamespace.Status.Phase = api.NamespaceTerminating
return kubeClient.Core().Namespaces().UpdateStatus(&newNamespace)
} }
// syncNamespace orchestrates deletion of a Namespace and its associated content. // worker processes the queue of namespace objects.
func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVersions, namespace *api.Namespace) error { // Each namespace can be in the queue at most once.
if namespace.DeletionTimestamp == nil { // The system ensures that no two workers can process
// the same namespace at the same time.
func (nm *NamespaceController) worker() {
for {
func() {
key, quit := nm.queue.Get()
if quit {
return
}
defer nm.queue.Done(key)
if err := nm.syncNamespaceFromKey(key.(string)); err != nil {
if estimate, ok := err.(*contentRemainingError); ok {
go func() {
t := estimate.Estimate/2 + 1
glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", key, t)
time.Sleep(time.Duration(t) * time.Second)
nm.queue.Add(key)
}()
}
}
}()
}
}
// syncNamespaceFromKey looks for a namespace with the specified key in its store and synchronizes it
func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) {
startTime := time.Now()
defer glog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Now().Sub(startTime))
obj, exists, err := nm.store.GetByKey(key)
if !exists {
glog.Infof("Namespace has been deleted %v", key)
return nil return nil
} }
// multiple controllers may edit a namespace during termination
// first get the latest state of the namespace before proceeding
// if the namespace was deleted already, don't do anything
namespace, err := kubeClient.Core().Namespaces().Get(namespace.Name)
if err != nil { if err != nil {
if errors.IsNotFound(err) { glog.Infof("Unable to retrieve namespace %v from store: %v", key, err)
return nil nm.queue.Add(key)
}
return err return err
} }
namespace := obj.(*api.Namespace)
return syncNamespace(nm.kubeClient, nm.versions, namespace)
}
glog.V(4).Infof("Syncing namespace %s", namespace.Name) // Run starts observing the system with the specified number of workers.
func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
// ensure that the status is up to date on the namespace defer utilruntime.HandleCrash()
// if we get a not found error, we assume the namespace is truly gone go nm.controller.Run(stopCh)
namespace, err = retryOnConflictError(kubeClient, namespace, updateNamespaceStatusFunc) for i := 0; i < workers; i++ {
if err != nil { go wait.Until(nm.worker, time.Second, stopCh)
if errors.IsNotFound(err) { }
return nil <-stopCh
} glog.Infof("Shutting down NamespaceController")
return err nm.queue.ShutDown()
}
// if the namespace is already finalized, delete it
if finalized(namespace) {
err = kubeClient.Core().Namespaces().Delete(namespace.Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
return nil
}
// there may still be content for us to remove
estimate, err := deleteAllContent(kubeClient, versions, namespace.Name, *namespace.DeletionTimestamp)
if err != nil {
return err
}
if estimate > 0 {
return &contentRemainingError{estimate}
}
// we have removed content, so mark it finalized by us
result, err := retryOnConflictError(kubeClient, namespace, finalizeNamespaceFunc)
if err != nil {
return err
}
// now check if all finalizers have reported that we delete now
if finalized(result) {
err = kubeClient.Core().Namespaces().Delete(namespace.Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteLimitRanges(kubeClient clientset.Interface, ns string) error {
items, err := kubeClient.Core().LimitRanges(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Core().LimitRanges(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteResourceQuotas(kubeClient clientset.Interface, ns string) error {
resourceQuotas, err := kubeClient.Core().ResourceQuotas(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range resourceQuotas.Items {
err := kubeClient.Core().ResourceQuotas(ns).Delete(resourceQuotas.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteServiceAccounts(kubeClient clientset.Interface, ns string) error {
items, err := kubeClient.Core().ServiceAccounts(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Core().ServiceAccounts(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteServices(kubeClient clientset.Interface, ns string) error {
items, err := kubeClient.Core().Services(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Core().Services(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteReplicationControllers(kubeClient clientset.Interface, ns string) error {
items, err := kubeClient.Core().ReplicationControllers(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Core().ReplicationControllers(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deletePods(kubeClient clientset.Interface, ns string, before unversioned.Time) (int64, error) {
items, err := kubeClient.Core().Pods(ns).List(api.ListOptions{})
if err != nil {
return 0, err
}
expired := unversioned.Now().After(before.Time)
var deleteOptions *api.DeleteOptions
if expired {
deleteOptions = api.NewDeleteOptions(0)
}
estimate := int64(0)
for i := range items.Items {
if items.Items[i].Spec.TerminationGracePeriodSeconds != nil {
grace := *items.Items[i].Spec.TerminationGracePeriodSeconds
if grace > estimate {
estimate = grace
}
}
err := kubeClient.Core().Pods(ns).Delete(items.Items[i].Name, deleteOptions)
if err != nil && !errors.IsNotFound(err) {
return 0, err
}
}
if expired {
estimate = 0
}
return estimate, nil
}
func deleteEvents(kubeClient clientset.Interface, ns string) error {
return kubeClient.Core().Events(ns).DeleteCollection(nil, api.ListOptions{})
}
func deleteSecrets(kubeClient clientset.Interface, ns string) error {
items, err := kubeClient.Core().Secrets(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Core().Secrets(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deletePersistentVolumeClaims(kubeClient clientset.Interface, ns string) error {
items, err := kubeClient.Core().PersistentVolumeClaims(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Core().PersistentVolumeClaims(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteHorizontalPodAutoscalers(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
items, err := expClient.HorizontalPodAutoscalers(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := expClient.HorizontalPodAutoscalers(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteDaemonSets(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
items, err := expClient.DaemonSets(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := expClient.DaemonSets(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteJobs(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
items, err := expClient.Jobs(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := expClient.Jobs(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteDeployments(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
items, err := expClient.Deployments(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := expClient.Deployments(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteIngress(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
items, err := expClient.Ingresses(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := expClient.Ingresses(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteReplicaSets(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
items, err := expClient.ReplicaSets(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := expClient.ReplicaSets(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
// TODO: this is duplicated logic. Move it somewhere central?
func containsVersion(versions *unversioned.APIVersions, version string) bool {
for ix := range versions.Versions {
if versions.Versions[ix] == version {
return true
}
}
return false
}
// TODO: this is duplicated logic. Move it somewhere central?
func containsResource(resources *unversioned.APIResourceList, resourceName string) bool {
if resources == nil {
return false
}
for ix := range resources.APIResources {
resource := resources.APIResources[ix]
if resource.Name == resourceName {
return true
}
}
return false
} }

View File

@ -20,7 +20,6 @@ import (
"fmt" "fmt"
"strings" "strings"
"testing" "testing"
"time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
@ -104,25 +103,25 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV
// TODO: Reuse the constants for all these strings from testclient // TODO: Reuse the constants for all these strings from testclient
pendingActionSet := sets.NewString( pendingActionSet := sets.NewString(
strings.Join([]string{"get", "namespaces", ""}, "-"), strings.Join([]string{"get", "namespaces", ""}, "-"),
strings.Join([]string{"list", "replicationcontrollers", ""}, "-"), strings.Join([]string{"delete-collection", "replicationcontrollers", ""}, "-"),
strings.Join([]string{"list", "services", ""}, "-"), strings.Join([]string{"list", "services", ""}, "-"),
strings.Join([]string{"list", "pods", ""}, "-"), strings.Join([]string{"list", "pods", ""}, "-"),
strings.Join([]string{"list", "resourcequotas", ""}, "-"), strings.Join([]string{"delete-collection", "resourcequotas", ""}, "-"),
strings.Join([]string{"list", "secrets", ""}, "-"), strings.Join([]string{"delete-collection", "secrets", ""}, "-"),
strings.Join([]string{"list", "limitranges", ""}, "-"), strings.Join([]string{"delete-collection", "limitranges", ""}, "-"),
strings.Join([]string{"delete-collection", "events", ""}, "-"), strings.Join([]string{"delete-collection", "events", ""}, "-"),
strings.Join([]string{"list", "serviceaccounts", ""}, "-"), strings.Join([]string{"delete-collection", "serviceaccounts", ""}, "-"),
strings.Join([]string{"list", "persistentvolumeclaims", ""}, "-"), strings.Join([]string{"delete-collection", "persistentvolumeclaims", ""}, "-"),
strings.Join([]string{"create", "namespaces", "finalize"}, "-"), strings.Join([]string{"create", "namespaces", "finalize"}, "-"),
) )
if containsVersion(versions, "extensions/v1beta1") { if containsVersion(versions, "extensions/v1beta1") {
pendingActionSet.Insert( pendingActionSet.Insert(
strings.Join([]string{"list", "daemonsets", ""}, "-"), strings.Join([]string{"delete-collection", "daemonsets", ""}, "-"),
strings.Join([]string{"list", "deployments", ""}, "-"), strings.Join([]string{"delete-collection", "deployments", ""}, "-"),
strings.Join([]string{"list", "jobs", ""}, "-"), strings.Join([]string{"delete-collection", "jobs", ""}, "-"),
strings.Join([]string{"list", "horizontalpodautoscalers", ""}, "-"), strings.Join([]string{"delete-collection", "horizontalpodautoscalers", ""}, "-"),
strings.Join([]string{"list", "ingresses", ""}, "-"), strings.Join([]string{"delete-collection", "ingresses", ""}, "-"),
strings.Join([]string{"get", "resource", ""}, "-"), strings.Join([]string{"get", "resource", ""}, "-"),
) )
} }
@ -225,25 +224,3 @@ func TestSyncNamespaceThatIsActive(t *testing.T) {
t.Errorf("Expected no action from controller, but got: %v", mockClient.Actions()) t.Errorf("Expected no action from controller, but got: %v", mockClient.Actions())
} }
} }
func TestRunStop(t *testing.T) {
mockClient := &fake.Clientset{}
nsController := NewNamespaceController(mockClient, &unversioned.APIVersions{}, 1*time.Second)
if nsController.StopEverything != nil {
t.Errorf("Non-running manager should not have a stop channel. Got %v", nsController.StopEverything)
}
nsController.Run()
if nsController.StopEverything == nil {
t.Errorf("Running manager should have a stop channel. Got nil")
}
nsController.Stop()
if nsController.StopEverything != nil {
t.Errorf("Non-running manager should not have a stop channel. Got %v", nsController.StopEverything)
}
}

View File

@ -0,0 +1,364 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package namespace
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
extensions_unversioned "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned"
"k8s.io/kubernetes/pkg/util/sets"
"github.com/golang/glog"
)
// contentRemainingError is used to inform the caller that content is not fully removed from the namespace
type contentRemainingError struct {
Estimate int64
}
func (e *contentRemainingError) Error() string {
return fmt.Sprintf("some content remains in the namespace, estimate %d seconds before it is removed", e.Estimate)
}
// updateNamespaceFunc is a function that makes an update to a namespace
type updateNamespaceFunc func(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error)
// retryOnConflictError retries the specified fn if there was a conflict error
// TODO RetryOnConflict should be a generic concept in client code
func retryOnConflictError(kubeClient clientset.Interface, namespace *api.Namespace, fn updateNamespaceFunc) (result *api.Namespace, err error) {
latestNamespace := namespace
for {
result, err = fn(kubeClient, latestNamespace)
if err == nil {
return result, nil
}
if !errors.IsConflict(err) {
return nil, err
}
latestNamespace, err = kubeClient.Core().Namespaces().Get(latestNamespace.Name)
if err != nil {
return nil, err
}
}
return
}
// updateNamespaceStatusFunc will verify that the status of the namespace is correct
func updateNamespaceStatusFunc(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) {
if namespace.DeletionTimestamp.IsZero() || namespace.Status.Phase == api.NamespaceTerminating {
return namespace, nil
}
newNamespace := api.Namespace{}
newNamespace.ObjectMeta = namespace.ObjectMeta
newNamespace.Status = namespace.Status
newNamespace.Status.Phase = api.NamespaceTerminating
return kubeClient.Core().Namespaces().UpdateStatus(&newNamespace)
}
// finalized returns true if the namespace.Spec.Finalizers is an empty list
func finalized(namespace *api.Namespace) bool {
return len(namespace.Spec.Finalizers) == 0
}
// finalizeNamespaceFunc removes the kubernetes token and finalizes the namespace
func finalizeNamespaceFunc(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) {
namespaceFinalize := api.Namespace{}
namespaceFinalize.ObjectMeta = namespace.ObjectMeta
namespaceFinalize.Spec = namespace.Spec
finalizerSet := sets.NewString()
for i := range namespace.Spec.Finalizers {
if namespace.Spec.Finalizers[i] != api.FinalizerKubernetes {
finalizerSet.Insert(string(namespace.Spec.Finalizers[i]))
}
}
namespaceFinalize.Spec.Finalizers = make([]api.FinalizerName, 0, len(finalizerSet))
for _, value := range finalizerSet.List() {
namespaceFinalize.Spec.Finalizers = append(namespaceFinalize.Spec.Finalizers, api.FinalizerName(value))
}
namespace, err := kubeClient.Core().Namespaces().Finalize(&namespaceFinalize)
if err != nil {
// it was removed already, so life is good
if errors.IsNotFound(err) {
return namespace, nil
}
}
return namespace, err
}
// deleteAllContent will delete all content known to the system in a namespace. It returns an estimate
// of the time remaining before the remaining resources are deleted. If estimate > 0 not all resources
// are guaranteed to be gone.
// TODO: this should use discovery to delete arbitrary namespace content
func deleteAllContent(kubeClient clientset.Interface, versions *unversioned.APIVersions, namespace string, before unversioned.Time) (estimate int64, err error) {
err = deleteServiceAccounts(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteServices(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteReplicationControllers(kubeClient, namespace)
if err != nil {
return estimate, err
}
estimate, err = deletePods(kubeClient, namespace, before)
if err != nil {
return estimate, err
}
err = deleteSecrets(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deletePersistentVolumeClaims(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteLimitRanges(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteResourceQuotas(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteEvents(kubeClient, namespace)
if err != nil {
return estimate, err
}
// If experimental mode, delete all experimental resources for the namespace.
if containsVersion(versions, "extensions/v1beta1") {
resources, err := kubeClient.Discovery().ServerResourcesForGroupVersion("extensions/v1beta1")
if err != nil {
return estimate, err
}
if containsResource(resources, "horizontalpodautoscalers") {
err = deleteHorizontalPodAutoscalers(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "ingresses") {
err = deleteIngress(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "daemonsets") {
err = deleteDaemonSets(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "jobs") {
err = deleteJobs(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "deployments") {
err = deleteDeployments(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
}
return estimate, nil
}
// syncNamespace orchestrates deletion of a Namespace and its associated content.
func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVersions, namespace *api.Namespace) error {
if namespace.DeletionTimestamp == nil {
return nil
}
// multiple controllers may edit a namespace during termination
// first get the latest state of the namespace before proceeding
// if the namespace was deleted already, don't do anything
namespace, err := kubeClient.Core().Namespaces().Get(namespace.Name)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
glog.V(4).Infof("Syncing namespace %s", namespace.Name)
// ensure that the status is up to date on the namespace
// if we get a not found error, we assume the namespace is truly gone
namespace, err = retryOnConflictError(kubeClient, namespace, updateNamespaceStatusFunc)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
// if the namespace is already finalized, delete it
if finalized(namespace) {
err = kubeClient.Core().Namespaces().Delete(namespace.Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
return nil
}
// there may still be content for us to remove
estimate, err := deleteAllContent(kubeClient, versions, namespace.Name, *namespace.DeletionTimestamp)
if err != nil {
return err
}
if estimate > 0 {
return &contentRemainingError{estimate}
}
// we have removed content, so mark it finalized by us
result, err := retryOnConflictError(kubeClient, namespace, finalizeNamespaceFunc)
if err != nil {
return err
}
// now check if all finalizers have reported that we delete now
if finalized(result) {
err = kubeClient.Core().Namespaces().Delete(namespace.Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteLimitRanges(kubeClient clientset.Interface, ns string) error {
return kubeClient.Core().LimitRanges(ns).DeleteCollection(nil, api.ListOptions{})
}
func deleteResourceQuotas(kubeClient clientset.Interface, ns string) error {
return kubeClient.Core().ResourceQuotas(ns).DeleteCollection(nil, api.ListOptions{})
}
func deleteServiceAccounts(kubeClient clientset.Interface, ns string) error {
return kubeClient.Core().ServiceAccounts(ns).DeleteCollection(nil, api.ListOptions{})
}
func deleteServices(kubeClient clientset.Interface, ns string) error {
items, err := kubeClient.Core().Services(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Core().Services(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteReplicationControllers(kubeClient clientset.Interface, ns string) error {
return kubeClient.Core().ReplicationControllers(ns).DeleteCollection(nil, api.ListOptions{})
}
func deletePods(kubeClient clientset.Interface, ns string, before unversioned.Time) (int64, error) {
items, err := kubeClient.Core().Pods(ns).List(api.ListOptions{})
if err != nil {
return 0, err
}
expired := unversioned.Now().After(before.Time)
var deleteOptions *api.DeleteOptions
if expired {
deleteOptions = api.NewDeleteOptions(0)
}
estimate := int64(0)
for i := range items.Items {
if items.Items[i].Spec.TerminationGracePeriodSeconds != nil {
grace := *items.Items[i].Spec.TerminationGracePeriodSeconds
if grace > estimate {
estimate = grace
}
}
err := kubeClient.Core().Pods(ns).Delete(items.Items[i].Name, deleteOptions)
if err != nil && !errors.IsNotFound(err) {
return 0, err
}
}
if expired {
estimate = 0
}
return estimate, nil
}
func deleteEvents(kubeClient clientset.Interface, ns string) error {
return kubeClient.Core().Events(ns).DeleteCollection(nil, api.ListOptions{})
}
func deleteSecrets(kubeClient clientset.Interface, ns string) error {
return kubeClient.Core().Secrets(ns).DeleteCollection(nil, api.ListOptions{})
}
func deletePersistentVolumeClaims(kubeClient clientset.Interface, ns string) error {
return kubeClient.Core().PersistentVolumeClaims(ns).DeleteCollection(nil, api.ListOptions{})
}
func deleteHorizontalPodAutoscalers(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
return expClient.HorizontalPodAutoscalers(ns).DeleteCollection(nil, api.ListOptions{})
}
func deleteDaemonSets(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
return expClient.DaemonSets(ns).DeleteCollection(nil, api.ListOptions{})
}
func deleteJobs(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
return expClient.Jobs(ns).DeleteCollection(nil, api.ListOptions{})
}
func deleteDeployments(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
return expClient.Deployments(ns).DeleteCollection(nil, api.ListOptions{})
}
func deleteIngress(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
return expClient.Ingresses(ns).DeleteCollection(nil, api.ListOptions{})
}
// TODO: this is duplicated logic. Move it somewhere central?
func containsVersion(versions *unversioned.APIVersions, version string) bool {
for ix := range versions.Versions {
if versions.Versions[ix] == version {
return true
}
}
return false
}
// TODO: this is duplicated logic. Move it somewhere central?
func containsResource(resources *unversioned.APIResourceList, resourceName string) bool {
if resources == nil {
return false
}
for ix := range resources.APIResources {
resource := resources.APIResources[ix]
if resource.Name == resourceName {
return true
}
}
return false
}