update resource quota controller for shared informers

pull/6/head
deads2k 2016-04-19 10:46:31 -04:00
parent 8c4e3af1a3
commit 60fe17d338
5 changed files with 32 additions and 28 deletions

View File

@ -263,7 +263,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
KubeClient: resourceQuotaControllerClient,
ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration),
Registry: resourceQuotaRegistry,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(resourceQuotaControllerClient),
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(podInformer, resourceQuotaControllerClient),
ReplenishmentResyncPeriod: ResyncPeriod(s),
GroupKindsToReplenish: groupKindsToReplenish,
}

View File

@ -193,7 +193,7 @@ func (s *CMServer) Run(_ []string) error {
Registry: resourceQuotaRegistry,
GroupKindsToReplenish: groupKindsToReplenish,
ReplenishmentResyncPeriod: s.resyncPeriod,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(resourceQuotaControllerClient),
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactoryFromClient(resourceQuotaControllerClient),
}
go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(s.ConcurrentResourceQuotaSyncs, wait.NeverStop)

View File

@ -28,6 +28,7 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/quota/evaluator/core"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@ -86,43 +87,46 @@ func ObjectReplenishmentDeleteFunc(options *ReplenishmentControllerOptions) func
// ReplenishmentControllerFactory knows how to build replenishment controllers
type ReplenishmentControllerFactory interface {
// NewController returns a controller configured with the specified options
NewController(options *ReplenishmentControllerOptions) (*framework.Controller, error)
// NewController returns a controller configured with the specified options.
// This method is NOT thread-safe.
NewController(options *ReplenishmentControllerOptions) (framework.ControllerInterface, error)
}
// replenishmentControllerFactory implements ReplenishmentControllerFactory
type replenishmentControllerFactory struct {
kubeClient clientset.Interface
kubeClient clientset.Interface
podInformer framework.SharedInformer
}
// NewReplenishmentControllerFactory returns a factory that knows how to build controllers
// to replenish resources when updated or deleted
func NewReplenishmentControllerFactory(kubeClient clientset.Interface) ReplenishmentControllerFactory {
func NewReplenishmentControllerFactory(podInformer framework.SharedInformer, kubeClient clientset.Interface) ReplenishmentControllerFactory {
return &replenishmentControllerFactory{
kubeClient: kubeClient,
kubeClient: kubeClient,
podInformer: podInformer,
}
}
func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (*framework.Controller, error) {
var result *framework.Controller
func NewReplenishmentControllerFactoryFromClient(kubeClient clientset.Interface) ReplenishmentControllerFactory {
return NewReplenishmentControllerFactory(nil, kubeClient)
}
func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (framework.ControllerInterface, error) {
var result framework.ControllerInterface
switch options.GroupKind {
case api.Kind("Pod"):
_, result = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
options.ResyncPeriod(),
framework.ResourceEventHandlerFuncs{
if r.podInformer != nil {
r.podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
UpdateFunc: PodReplenishmentUpdateFunc(options),
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
)
})
result = r.podInformer.GetController()
break
}
r.podInformer = informers.CreateSharedPodInformer(r.kubeClient, options.ResyncPeriod())
result = r.podInformer
case api.Kind("Service"):
_, result = framework.NewInformer(
&cache.ListWatch{

View File

@ -69,7 +69,7 @@ type ResourceQuotaController struct {
// knows how to calculate usage
registry quota.Registry
// controllers monitoring to notify for replenishment
replenishmentControllers []*framework.Controller
replenishmentControllers []framework.ControllerInterface
}
func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController {
@ -79,7 +79,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
queue: workqueue.New(),
resyncPeriod: options.ResyncPeriod,
registry: options.Registry,
replenishmentControllers: []*framework.Controller{},
replenishmentControllers: []framework.ControllerInterface{},
}
// set the synchronization handler

View File

@ -113,7 +113,7 @@ func TestSyncResourceQuota(t *testing.T) {
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
},
ControllerFactory: NewReplenishmentControllerFactory(kubeClient),
ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
}
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
@ -199,7 +199,7 @@ func TestSyncResourceQuotaSpecChange(t *testing.T) {
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
},
ControllerFactory: NewReplenishmentControllerFactory(kubeClient),
ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
}
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
@ -276,7 +276,7 @@ func TestSyncResourceQuotaNoChange(t *testing.T) {
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
},
ControllerFactory: NewReplenishmentControllerFactory(kubeClient),
ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient),
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
}
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)