From 214b4c28bc5f4df4ce961550ca60fc42ad3c9f82 Mon Sep 17 00:00:00 2001 From: saadali Date: Fri, 29 Apr 2016 23:36:27 -0700 Subject: [PATCH] Skeleton of new attach detach controller --- .../app/controllermanager.go | 7 ++ pkg/controller/framework/informers/factory.go | 18 +++ .../volume/attach_detach_controller.go | 116 ++++++++++++++++++ 3 files changed, 141 insertions(+) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 84e44fd12f..fb6d54355f 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -65,6 +65,7 @@ import ( routecontroller "k8s.io/kubernetes/pkg/controller/route" servicecontroller "k8s.io/kubernetes/pkg/controller/service" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" + "k8s.io/kubernetes/pkg/controller/volume" "k8s.io/kubernetes/pkg/healthz" quotainstall "k8s.io/kubernetes/pkg/quota/install" "k8s.io/kubernetes/pkg/serviceaccount" @@ -195,8 +196,10 @@ func Run(s *options.CMServer) error { func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error { podInformer := informers.CreateSharedPodIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)()) + nodeInformer := informers.CreateSharedNodeIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-informer")), ResyncPeriod(s)()) informers := map[reflect.Type]framework.SharedIndexInformer{} informers[reflect.TypeOf(&api.Pod{})] = podInformer + informers[reflect.TypeOf(&api.Node{})] = nodeInformer go endpointcontroller.NewEndpointController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))). Run(int(s.ConcurrentEndpointSyncs), wait.NeverStop) @@ -402,6 +405,10 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } + go volume.NewAttachDetachController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "attachdetach-controller")), podInformer, nodeInformer, ResyncPeriod(s)()). + Run(wait.NeverStop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + var rootCA []byte if s.RootCAFile != "" { diff --git a/pkg/controller/framework/informers/factory.go b/pkg/controller/framework/informers/factory.go index 3c3472cc52..0128407e97 100644 --- a/pkg/controller/framework/informers/factory.go +++ b/pkg/controller/framework/informers/factory.go @@ -64,3 +64,21 @@ func CreateSharedPodIndexInformer(client clientset.Interface, resyncPeriod time. return sharedIndexInformer } + +// CreateSharedNodeIndexInformer returns a SharedIndexInformer that lists and watches all nodes +func CreateSharedNodeIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { + sharedIndexInformer := framework.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return client.Core().Nodes().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Core().Nodes().Watch(options) + }, + }, + &api.Node{}, + resyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + + return sharedIndexInformer +} diff --git a/pkg/controller/volume/attach_detach_controller.go b/pkg/controller/volume/attach_detach_controller.go index 683a95e830..0579969970 100644 --- a/pkg/controller/volume/attach_detach_controller.go +++ b/pkg/controller/volume/attach_detach_controller.go @@ -17,3 +17,119 @@ limitations under the License. // Package volume implements a controller to manage volume attach and detach // operations. package volume + +import ( + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" + "k8s.io/kubernetes/pkg/util/runtime" +) + +// AttachDetachController defines the operations supported by this controller. +type AttachDetachController interface { + Run(stopCh <-chan struct{}) +} + +type attachDetachController struct { + // internalPodInformer is the shared pod informer used to fetch and store + // pod objects from the API server. It is shared with other controllers and + // therefore the pod objects in its store should be treated as immutable. + internalPodInformer framework.SharedInformer + + // selfCreatedPodInformer is true if the internalPodInformer was created + // during initialization, not passed in. + selfCreatedPodInformer bool + + // internalNodeInformer is the shared node informer used to fetch and store + // node objects from the API server. It is shared with other controllers + // and therefore the node objects in its store should be treated as + // immutable. + internalNodeInformer framework.SharedInformer + + // selfCreatedNodeInformer is true if the internalNodeInformer was created + // during initialization, not passed in. + selfCreatedNodeInformer bool +} + +// NewAttachDetachController returns a new instance of AttachDetachController. +func NewAttachDetachController( + kubeClient internalclientset.Interface, + podInformer framework.SharedInformer, + nodeInformer framework.SharedInformer, + resyncPeriod time.Duration) AttachDetachController { + selfCreatedPodInformer := false + selfCreatedNodeInformer := false + if podInformer == nil { + podInformer = informers.CreateSharedPodInformer(kubeClient, resyncPeriod) + selfCreatedPodInformer = true + } + if nodeInformer == nil { + nodeInformer = informers.CreateSharedNodeIndexInformer(kubeClient, resyncPeriod) + selfCreatedNodeInformer = true + } + + adc := &attachDetachController{ + internalPodInformer: podInformer, + selfCreatedPodInformer: selfCreatedPodInformer, + internalNodeInformer: nodeInformer, + selfCreatedNodeInformer: selfCreatedNodeInformer, + } + + podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ + AddFunc: adc.podAdd, + UpdateFunc: adc.podUpdate, + DeleteFunc: adc.podDelete, + }) + + nodeInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ + AddFunc: adc.nodeAdd, + UpdateFunc: adc.nodeUpdate, + DeleteFunc: adc.nodeDelete, + }) + + return adc +} + +func (adc *attachDetachController) Run(stopCh <-chan struct{}) { + defer runtime.HandleCrash() + glog.Infof("Starting Attach Detach Controller") + + // Start self-created shared informers + if adc.selfCreatedPodInformer { + go adc.internalPodInformer.Run(stopCh) + } + + if adc.selfCreatedNodeInformer { + go adc.internalNodeInformer.Run(stopCh) + } + + <-stopCh + glog.Infof("Shutting down Attach Detach Controller") +} + +func (adc *attachDetachController) podAdd(obj interface{}) { + // No op for now +} + +func (adc *attachDetachController) podUpdate(oldObj, newObj interface{}) { + // No op for now +} + +func (adc *attachDetachController) podDelete(obj interface{}) { + // No op for now +} + +func (adc *attachDetachController) nodeAdd(obj interface{}) { + // No op for now +} + +func (adc *attachDetachController) nodeUpdate(oldObj, newObj interface{}) { + // No op for now +} + +func (adc *attachDetachController) nodeDelete(obj interface{}) { + // No op for now +}