Skeleton of new attach detach controller

pull/6/head
saadali 2016-04-29 23:36:27 -07:00
parent e973b5d27a
commit 214b4c28bc
3 changed files with 141 additions and 0 deletions

View File

@ -65,6 +65,7 @@ import (
routecontroller "k8s.io/kubernetes/pkg/controller/route"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controller/volume"
"k8s.io/kubernetes/pkg/healthz"
quotainstall "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/serviceaccount"
@ -195,8 +196,10 @@ func Run(s *options.CMServer) error {
func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error {
podInformer := informers.CreateSharedPodIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)())
nodeInformer := informers.CreateSharedNodeIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-informer")), ResyncPeriod(s)())
informers := map[reflect.Type]framework.SharedIndexInformer{}
informers[reflect.TypeOf(&api.Pod{})] = podInformer
informers[reflect.TypeOf(&api.Node{})] = nodeInformer
go endpointcontroller.NewEndpointController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))).
Run(int(s.ConcurrentEndpointSyncs), wait.NeverStop)
@ -402,6 +405,10 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
go volume.NewAttachDetachController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "attachdetach-controller")), podInformer, nodeInformer, ResyncPeriod(s)()).
Run(wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
var rootCA []byte
if s.RootCAFile != "" {

View File

@ -64,3 +64,21 @@ func CreateSharedPodIndexInformer(client clientset.Interface, resyncPeriod time.
return sharedIndexInformer
}
// CreateSharedNodeIndexInformer returns a SharedIndexInformer that lists and watches all nodes
func CreateSharedNodeIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
sharedIndexInformer := framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Nodes().Watch(options)
},
},
&api.Node{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
return sharedIndexInformer
}

View File

@ -17,3 +17,119 @@ limitations under the License.
// Package volume implements a controller to manage volume attach and detach
// operations.
package volume
import (
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/util/runtime"
)
// AttachDetachController defines the operations supported by this controller.
type AttachDetachController interface {
Run(stopCh <-chan struct{})
}
type attachDetachController struct {
// internalPodInformer is the shared pod informer used to fetch and store
// pod objects from the API server. It is shared with other controllers and
// therefore the pod objects in its store should be treated as immutable.
internalPodInformer framework.SharedInformer
// selfCreatedPodInformer is true if the internalPodInformer was created
// during initialization, not passed in.
selfCreatedPodInformer bool
// internalNodeInformer is the shared node informer used to fetch and store
// node objects from the API server. It is shared with other controllers
// and therefore the node objects in its store should be treated as
// immutable.
internalNodeInformer framework.SharedInformer
// selfCreatedNodeInformer is true if the internalNodeInformer was created
// during initialization, not passed in.
selfCreatedNodeInformer bool
}
// NewAttachDetachController returns a new instance of AttachDetachController.
func NewAttachDetachController(
kubeClient internalclientset.Interface,
podInformer framework.SharedInformer,
nodeInformer framework.SharedInformer,
resyncPeriod time.Duration) AttachDetachController {
selfCreatedPodInformer := false
selfCreatedNodeInformer := false
if podInformer == nil {
podInformer = informers.CreateSharedPodInformer(kubeClient, resyncPeriod)
selfCreatedPodInformer = true
}
if nodeInformer == nil {
nodeInformer = informers.CreateSharedNodeIndexInformer(kubeClient, resyncPeriod)
selfCreatedNodeInformer = true
}
adc := &attachDetachController{
internalPodInformer: podInformer,
selfCreatedPodInformer: selfCreatedPodInformer,
internalNodeInformer: nodeInformer,
selfCreatedNodeInformer: selfCreatedNodeInformer,
}
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
AddFunc: adc.podAdd,
UpdateFunc: adc.podUpdate,
DeleteFunc: adc.podDelete,
})
nodeInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
AddFunc: adc.nodeAdd,
UpdateFunc: adc.nodeUpdate,
DeleteFunc: adc.nodeDelete,
})
return adc
}
func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
defer runtime.HandleCrash()
glog.Infof("Starting Attach Detach Controller")
// Start self-created shared informers
if adc.selfCreatedPodInformer {
go adc.internalPodInformer.Run(stopCh)
}
if adc.selfCreatedNodeInformer {
go adc.internalNodeInformer.Run(stopCh)
}
<-stopCh
glog.Infof("Shutting down Attach Detach Controller")
}
func (adc *attachDetachController) podAdd(obj interface{}) {
// No op for now
}
func (adc *attachDetachController) podUpdate(oldObj, newObj interface{}) {
// No op for now
}
func (adc *attachDetachController) podDelete(obj interface{}) {
// No op for now
}
func (adc *attachDetachController) nodeAdd(obj interface{}) {
// No op for now
}
func (adc *attachDetachController) nodeUpdate(oldObj, newObj interface{}) {
// No op for now
}
func (adc *attachDetachController) nodeDelete(obj interface{}) {
// No op for now
}