Implement volume controller skeleton.

This is a simple controller that watches changes of PersistentVolumes and
PersistentVolumeClaims.
pull/6/head
Jan Safranek 2016-05-17 14:55:04 +02:00
parent b86e5923b2
commit 71aa892a86
3 changed files with 251 additions and 42 deletions

View File

@ -373,38 +373,21 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
}
}
volumePlugins := ProbeRecyclableVolumePlugins(s.VolumeConfiguration)
provisioner, err := NewVolumeProvisioner(cloud, s.VolumeConfiguration)
if err != nil {
glog.Fatal("A Provisioner could not be created, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.")
}
pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-binder")), s.PVClaimBinderSyncPeriod.Duration)
pvclaimBinder.Run()
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-recycler")),
volumeController := persistentvolumecontroller.NewPersistentVolumeController(
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-binder")),
s.PVClaimBinderSyncPeriod.Duration,
int(s.VolumeConfiguration.PersistentVolumeRecyclerConfiguration.MaximumRetry),
provisioner,
ProbeRecyclableVolumePlugins(s.VolumeConfiguration),
cloud,
)
if err != nil {
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
}
pvRecycler.Run()
volumeController.Run()
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
if provisioner != nil {
pvController, err := persistentvolumecontroller.NewPersistentVolumeProvisionerController(persistentvolumecontroller.NewControllerClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-provisioner"))), s.PVClaimBinderSyncPeriod.Duration, s.ClusterName, volumePlugins, provisioner, cloud)
if err != nil {
glog.Fatalf("Failed to start persistent volume provisioner controller: %+v", err)
}
pvController.Run()
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
go volume.NewAttachDetachController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "attachdetach-controller")), podInformer, nodeInformer, ResyncPeriod(s)()).
Run(wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

View File

@ -271,37 +271,19 @@ func (s *CMServer) Run(_ []string) error {
}
}
volumePlugins := kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfiguration)
provisioner, err := kubecontrollermanager.NewVolumeProvisioner(cloud, s.VolumeConfiguration)
if err != nil {
glog.Fatal("A Provisioner could not be created, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.")
}
pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(
volumeController := persistentvolumecontroller.NewPersistentVolumeController(
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-binder")),
s.PVClaimBinderSyncPeriod.Duration,
)
pvclaimBinder.Run()
pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-recycler")),
s.PVClaimBinderSyncPeriod.Duration,
int(s.VolumeConfiguration.PersistentVolumeRecyclerConfiguration.MaximumRetry),
provisioner,
kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfiguration),
cloud,
)
if err != nil {
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
}
pvRecycler.Run()
if provisioner != nil {
pvController, err := persistentvolumecontroller.NewPersistentVolumeProvisionerController(persistentvolumecontroller.NewControllerClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-controller"))), s.PVClaimBinderSyncPeriod.Duration, s.ClusterName, volumePlugins, provisioner, cloud)
if err != nil {
glog.Fatalf("Failed to start persistent volume provisioner controller: %+v", err)
}
pvController.Run()
}
volumeController.Run()
var rootCA []byte

View File

@ -0,0 +1,244 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
vol "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// PersistentVolumeController is a controller that synchronizes
// PersistentVolumeClaims and PersistentVolumes. It starts two
// framework.Controllers that watch PerstentVolume and PersistentVolumeClaim
// changes.
type PersistentVolumeController struct {
volumes persistentVolumeOrderedIndex
volumeController *framework.Controller
volumeControllerStopCh chan struct{}
claims cache.Store
claimController *framework.Controller
claimControllerStopCh chan struct{}
kubeClient clientset.Interface
}
// NewPersistentVolumeController creates a new PersistentVolumeController
func NewPersistentVolumeController(
kubeClient clientset.Interface,
syncPeriod time.Duration,
provisioner vol.ProvisionableVolumePlugin,
recyclers []vol.VolumePlugin,
cloud cloudprovider.Interface) *PersistentVolumeController {
controller := &PersistentVolumeController{
kubeClient: kubeClient,
}
volumeSource := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.Core().PersistentVolumes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.Core().PersistentVolumes().Watch(options)
},
}
claimSource := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
},
}
controller.initializeController(syncPeriod, volumeSource, claimSource)
return controller
}
// initializeController prepares watching for PersistentVolume and
// PersistentVolumeClaim events from given sources. This should be used to
// initialize the controller for real operation (with real event sources) and
// also during testing (with fake ones).
func (ctrl *PersistentVolumeController) initializeController(syncPeriod time.Duration, volumeSource, claimSource cache.ListerWatcher) {
glog.V(4).Infof("initializing PersistentVolumeController, sync every %s", syncPeriod.String())
ctrl.volumes.store, ctrl.volumeController = framework.NewIndexerInformer(
volumeSource,
&api.PersistentVolume{},
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: ctrl.addVolume,
UpdateFunc: ctrl.updateVolume,
DeleteFunc: ctrl.deleteVolume,
},
cache.Indexers{"accessmodes": accessModesIndexFunc},
)
ctrl.claims, ctrl.claimController = framework.NewInformer(
claimSource,
&api.PersistentVolumeClaim{},
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: ctrl.addClaim,
UpdateFunc: ctrl.updateClaim,
DeleteFunc: ctrl.deleteClaim,
},
)
}
// addVolume is callback from framework.Controller watching PersistentVolume
// events.
func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
pv, ok := obj.(*api.PersistentVolume)
if !ok {
glog.Errorf("expected PersistentVolume but handler received %+v", obj)
return
}
if err := ctrl.syncVolume(pv); err != nil {
glog.Errorf("PersistentVolumeController could not add volume %q: %+v", pv.Name, err)
}
}
// updateVolume is callback from framework.Controller watching PersistentVolume
// events.
func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) {
newVolume, ok := newObj.(*api.PersistentVolume)
if !ok {
glog.Errorf("Expected PersistentVolume but handler received %+v", newObj)
return
}
if err := ctrl.syncVolume(newVolume); err != nil {
glog.Errorf("PersistentVolumeController could not update volume %q: %+v", newVolume.Name, err)
}
}
// deleteVolume is callback from framework.Controller watching PersistentVolume
// events.
func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) {
// Intentionally left blank - we do not react on deleted volumes
}
// addClaim is callback from framework.Controller watching PersistentVolumeClaim
// events.
func (ctrl *PersistentVolumeController) addClaim(obj interface{}) {
claim, ok := obj.(*api.PersistentVolumeClaim)
if !ok {
glog.Errorf("Expected PersistentVolumeClaim but addClaim received %+v", obj)
return
}
if err := ctrl.syncClaim(claim); err != nil {
glog.Errorf("PersistentVolumeController could not add claim %q: %+v", claimToClaimKey(claim), err)
}
}
// updateClaim is callback from framework.Controller watching PersistentVolumeClaim
// events.
func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) {
newClaim, ok := newObj.(*api.PersistentVolumeClaim)
if !ok {
glog.Errorf("Expected PersistentVolumeClaim but updateClaim received %+v", newObj)
return
}
if err := ctrl.syncClaim(newClaim); err != nil {
glog.Errorf("PersistentVolumeController could not update claim %q: %+v", claimToClaimKey(newClaim), err)
}
}
// deleteClaim is callback from framework.Controller watching PersistentVolumeClaim
// events.
func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
var volume *api.PersistentVolume
var claim *api.PersistentVolumeClaim
var ok bool
claim, ok = obj.(*api.PersistentVolumeClaim)
if !ok {
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
claim, ok = unknown.Obj.(*api.PersistentVolumeClaim)
if !ok {
glog.Errorf("Expected PersistentVolumeClaim but deleteClaim received %+v", unknown.Obj)
return
}
} else {
glog.Errorf("Expected PersistentVolumeClaim but deleteClaim received %+v", obj)
return
}
}
if !ok || claim == nil {
return
}
if pvObj, exists, _ := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName); exists {
if volume, ok = pvObj.(*api.PersistentVolume); ok {
// sync the volume when its claim is deleted. Explicitly sync'ing the
// volume here in response to claim deletion prevents the volume from
// waiting until the next sync period for its Release.
if volume != nil {
err := ctrl.syncVolume(volume)
if err != nil {
glog.Errorf("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", volume.Name, err)
}
}
} else {
glog.Errorf("Cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, pvObj)
}
}
}
func (ctrl *PersistentVolumeController) syncVolume(volume *api.PersistentVolume) error {
glog.V(4).Infof("synchronizing PersistentVolume[%s], current phase: %s", volume.Name, volume.Status.Phase)
return nil
}
func (ctrl *PersistentVolumeController) syncClaim(claim *api.PersistentVolumeClaim) error {
glog.V(4).Infof("synchronizing PersistentVolumeClaim[%s], current phase: %s", claim.Name, claim.Status.Phase)
return nil
}
// Run starts all of this controller's control loops
func (ctrl *PersistentVolumeController) Run() {
glog.V(4).Infof("starting PersistentVolumeController")
if ctrl.volumeControllerStopCh == nil {
ctrl.volumeControllerStopCh = make(chan struct{})
go ctrl.volumeController.Run(ctrl.volumeControllerStopCh)
}
if ctrl.claimControllerStopCh == nil {
ctrl.claimControllerStopCh = make(chan struct{})
go ctrl.claimController.Run(ctrl.claimControllerStopCh)
}
}
// Stop gracefully shuts down this controller
func (ctrl *PersistentVolumeController) Stop() {
glog.V(4).Infof("stopping PersistentVolumeController")
close(ctrl.volumeControllerStopCh)
close(ctrl.claimControllerStopCh)
}