mirror of https://github.com/k3s-io/k3s
complete the controller context for init funcs
parent
b7f52a8c2e
commit
475916cc59
|
@ -37,7 +37,6 @@ import (
|
|||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
|
||||
"k8s.io/client-go/discovery"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
|
@ -55,13 +54,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
|
||||
routecontroller "k8s.io/kubernetes/pkg/controller/route"
|
||||
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
|
||||
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
|
||||
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/serviceaccount"
|
||||
"k8s.io/kubernetes/pkg/util/configz"
|
||||
|
||||
|
@ -175,10 +168,19 @@ func Run(s *options.CMServer) error {
|
|||
} else {
|
||||
clientBuilder = rootClientBuilder
|
||||
}
|
||||
ctx, err := CreateControllerContext(s, rootClientBuilder, clientBuilder, stop)
|
||||
if err != nil {
|
||||
glog.Fatalf("error building controller context: %v", err)
|
||||
}
|
||||
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
|
||||
|
||||
err := StartControllers(NewControllerInitializers(), s, rootClientBuilder, clientBuilder, stop)
|
||||
glog.Fatalf("error running controllers: %v", err)
|
||||
panic("unreachable")
|
||||
if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers()); err != nil {
|
||||
glog.Fatalf("error starting controllers: %v", err)
|
||||
}
|
||||
|
||||
ctx.InformerFactory.Start(ctx.Stop)
|
||||
|
||||
select {}
|
||||
}
|
||||
|
||||
if !s.LeaderElection.LeaderElect {
|
||||
|
@ -231,6 +233,10 @@ type ControllerContext struct {
|
|||
// AvailableResources is a map listing currently available resources
|
||||
AvailableResources map[schema.GroupVersionResource]bool
|
||||
|
||||
// Cloud is the cloud provider interface for the controllers to use.
|
||||
// It must be initialized and ready to use.
|
||||
Cloud cloudprovider.Interface
|
||||
|
||||
// Stop is the stop channel
|
||||
Stop <-chan struct{}
|
||||
}
|
||||
|
@ -272,16 +278,14 @@ type InitFunc func(ctx ControllerContext) (bool, error)
|
|||
func KnownControllers() []string {
|
||||
ret := sets.StringKeySet(NewControllerInitializers())
|
||||
|
||||
// add "special" controllers that aren't initialized normally. These controllers cannot be initialized
|
||||
// using a normal function. The only known special case is the SA token controller which *must* be started
|
||||
// first to ensure that the SA tokens for future controllers will exist. Think very carefully before adding
|
||||
// to this list.
|
||||
ret.Insert(
|
||||
saTokenControllerName,
|
||||
nodeControllerName,
|
||||
serviceControllerName,
|
||||
routeControllerName,
|
||||
pvBinderControllerName,
|
||||
attachDetachControllerName,
|
||||
)
|
||||
|
||||
// add "special" controllers that aren't initialized normally
|
||||
return ret.List()
|
||||
}
|
||||
|
||||
|
@ -290,6 +294,10 @@ var ControllersDisabledByDefault = sets.NewString(
|
|||
"tokencleaner",
|
||||
)
|
||||
|
||||
const (
|
||||
saTokenControllerName = "serviceaccount-token"
|
||||
)
|
||||
|
||||
// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
|
||||
// paired to their InitFunc. This allows for structured downstream composition and subdivision.
|
||||
func NewControllerInitializers() map[string]InitFunc {
|
||||
|
@ -314,6 +322,11 @@ func NewControllerInitializers() map[string]InitFunc {
|
|||
controllers["ttl"] = startTTLController
|
||||
controllers["bootstrapsigner"] = startBootstrapSignerController
|
||||
controllers["tokencleaner"] = startTokenCleanerController
|
||||
controllers["service"] = startServiceController
|
||||
controllers["node"] = startNodeController
|
||||
controllers["route"] = startRouteController
|
||||
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
|
||||
controllers["attachdetach"] = startAttachDetachController
|
||||
|
||||
return controllers
|
||||
}
|
||||
|
@ -366,61 +379,22 @@ func GetAvailableResources(clientBuilder controller.ControllerClientBuilder) (ma
|
|||
return allResources, nil
|
||||
}
|
||||
|
||||
const (
|
||||
saTokenControllerName = "serviceaccount-token"
|
||||
nodeControllerName = "node"
|
||||
serviceControllerName = "service"
|
||||
routeControllerName = "route"
|
||||
pvBinderControllerName = "persistentvolume-binder"
|
||||
attachDetachControllerName = "attachdetach"
|
||||
)
|
||||
|
||||
func StartControllers(controllers map[string]InitFunc, s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error {
|
||||
func CreateControllerContext(s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
|
||||
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
|
||||
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
|
||||
|
||||
// always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
|
||||
if len(s.ServiceAccountKeyFile) > 0 && IsControllerEnabled(saTokenControllerName, ControllersDisabledByDefault, s.Controllers...) {
|
||||
privateKey, err := serviceaccount.ReadPrivateKey(s.ServiceAccountKeyFile)
|
||||
availableResources, err := GetAvailableResources(rootClientBuilder)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error reading key for service account token controller: %v", err)
|
||||
} else {
|
||||
var rootCA []byte
|
||||
if s.RootCAFile != "" {
|
||||
rootCA, err = ioutil.ReadFile(s.RootCAFile)
|
||||
return ControllerContext{}, err
|
||||
}
|
||||
|
||||
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error reading root-ca-file at %s: %v", s.RootCAFile, err)
|
||||
return ControllerContext{}, fmt.Errorf("cloud provider could not be initialized: %v", err)
|
||||
}
|
||||
if _, err := certutil.ParseCertsPEM(rootCA); err != nil {
|
||||
return fmt.Errorf("error parsing root-ca-file at %s: %v", s.RootCAFile, err)
|
||||
}
|
||||
} else {
|
||||
rootCA = rootClientBuilder.ConfigOrDie("tokens-controller").CAData
|
||||
}
|
||||
|
||||
controller := serviceaccountcontroller.NewTokensController(
|
||||
sharedInformers.Core().V1().ServiceAccounts(),
|
||||
sharedInformers.Core().V1().Secrets(),
|
||||
rootClientBuilder.ClientOrDie("tokens-controller"),
|
||||
serviceaccountcontroller.TokensControllerOptions{
|
||||
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
|
||||
RootCA: rootCA,
|
||||
},
|
||||
)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
go controller.Run(int(s.ConcurrentSATokenSyncs), stop)
|
||||
|
||||
// start the first set of informers now so that other controllers can start
|
||||
sharedInformers.Start(stop)
|
||||
}
|
||||
|
||||
} else {
|
||||
glog.Warningf("%q is disabled", saTokenControllerName)
|
||||
}
|
||||
|
||||
availableResources, err := GetAvailableResources(clientBuilder)
|
||||
if err != nil {
|
||||
return err
|
||||
if cloud != nil {
|
||||
// Initialize the cloud provider with a reference to the clientBuilder
|
||||
cloud.Initialize(rootClientBuilder)
|
||||
}
|
||||
|
||||
ctx := ControllerContext{
|
||||
|
@ -428,8 +402,18 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
|
|||
InformerFactory: sharedInformers,
|
||||
Options: *s,
|
||||
AvailableResources: availableResources,
|
||||
Cloud: cloud,
|
||||
Stop: stop,
|
||||
}
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error {
|
||||
// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
|
||||
// If this fails, just return here and fail since other controllers won't be able to get credentials.
|
||||
if _, err := startSATokenController(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for controllerName, initFn := range controllers {
|
||||
if !ctx.IsControllerEnabled(controllerName) {
|
||||
|
@ -437,7 +421,7 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
|
|||
continue
|
||||
}
|
||||
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
time.Sleep(wait.Jitter(ctx.Options.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
glog.V(1).Infof("Starting %q", controllerName)
|
||||
started, err := initFn(ctx)
|
||||
|
@ -452,144 +436,57 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
|
|||
glog.Infof("Started %q", controllerName)
|
||||
}
|
||||
|
||||
// all the remaining plugins want this cloud variable
|
||||
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cloud provider could not be initialized: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if cloud != nil {
|
||||
// Initialize the cloud provider with a reference to the clientBuilder
|
||||
cloud.Initialize(clientBuilder)
|
||||
// serviceAccountTokenControllerStarter is special because it must run first to set up permissions for other controllers.
|
||||
// It cannot use the "normal" client builder, so it tracks its own. It must also avoid being included in the "normal"
|
||||
// init map so that it can always run first.
|
||||
type serviceAccountTokenControllerStarter struct {
|
||||
rootClientBuilder controller.ControllerClientBuilder
|
||||
}
|
||||
|
||||
if ctx.IsControllerEnabled(nodeControllerName) {
|
||||
_, clusterCIDR, err := net.ParseCIDR(s.ClusterCIDR)
|
||||
if err != nil {
|
||||
glog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", s.ClusterCIDR, err)
|
||||
func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx ControllerContext) (bool, error) {
|
||||
if !ctx.IsControllerEnabled(saTokenControllerName) {
|
||||
glog.Warningf("%q is disabled", saTokenControllerName)
|
||||
return false, nil
|
||||
}
|
||||
_, serviceCIDR, err := net.ParseCIDR(s.ServiceCIDR)
|
||||
if err != nil {
|
||||
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
|
||||
|
||||
if len(ctx.Options.ServiceAccountKeyFile) == 0 {
|
||||
glog.Warningf("%q is disabled because there is no private key", saTokenControllerName)
|
||||
return false, nil
|
||||
}
|
||||
nodeController, err := nodecontroller.NewNodeController(
|
||||
sharedInformers.Core().V1().Pods(),
|
||||
sharedInformers.Core().V1().Nodes(),
|
||||
sharedInformers.Extensions().V1beta1().DaemonSets(),
|
||||
cloud,
|
||||
clientBuilder.ClientOrDie("node-controller"),
|
||||
s.PodEvictionTimeout.Duration,
|
||||
s.NodeEvictionRate,
|
||||
s.SecondaryNodeEvictionRate,
|
||||
s.LargeClusterSizeThreshold,
|
||||
s.UnhealthyZoneThreshold,
|
||||
s.NodeMonitorGracePeriod.Duration,
|
||||
s.NodeStartupGracePeriod.Duration,
|
||||
s.NodeMonitorPeriod.Duration,
|
||||
clusterCIDR,
|
||||
serviceCIDR,
|
||||
int(s.NodeCIDRMaskSize),
|
||||
s.AllocateNodeCIDRs,
|
||||
nodecontroller.CIDRAllocatorType(s.CIDRAllocatorType),
|
||||
s.EnableTaintManager,
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
|
||||
privateKey, err := serviceaccount.ReadPrivateKey(ctx.Options.ServiceAccountKeyFile)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("error reading key for service account token controller: %v", err)
|
||||
}
|
||||
|
||||
var rootCA []byte
|
||||
if ctx.Options.RootCAFile != "" {
|
||||
rootCA, err = ioutil.ReadFile(ctx.Options.RootCAFile)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("error reading root-ca-file at %s: %v", ctx.Options.RootCAFile, err)
|
||||
}
|
||||
if _, err := certutil.ParseCertsPEM(rootCA); err != nil {
|
||||
return true, fmt.Errorf("error parsing root-ca-file at %s: %v", ctx.Options.RootCAFile, err)
|
||||
}
|
||||
} else {
|
||||
rootCA = c.rootClientBuilder.ConfigOrDie("tokens-controller").CAData
|
||||
}
|
||||
|
||||
controller := serviceaccountcontroller.NewTokensController(
|
||||
ctx.InformerFactory.Core().V1().ServiceAccounts(),
|
||||
ctx.InformerFactory.Core().V1().Secrets(),
|
||||
c.rootClientBuilder.ClientOrDie("tokens-controller"),
|
||||
serviceaccountcontroller.TokensControllerOptions{
|
||||
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
|
||||
RootCA: rootCA,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize nodecontroller: %v", err)
|
||||
}
|
||||
go nodeController.Run(stop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
} else {
|
||||
glog.Warningf("%q is disabled", nodeControllerName)
|
||||
}
|
||||
go controller.Run(int(ctx.Options.ConcurrentSATokenSyncs), ctx.Stop)
|
||||
|
||||
if ctx.IsControllerEnabled(serviceControllerName) {
|
||||
serviceController, err := servicecontroller.New(
|
||||
cloud,
|
||||
clientBuilder.ClientOrDie("service-controller"),
|
||||
sharedInformers.Core().V1().Services(),
|
||||
sharedInformers.Core().V1().Nodes(),
|
||||
s.ClusterName,
|
||||
)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to start service controller: %v", err)
|
||||
} else {
|
||||
go serviceController.Run(stop, int(s.ConcurrentServiceSyncs))
|
||||
}
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
} else {
|
||||
glog.Warningf("%q is disabled", serviceControllerName)
|
||||
}
|
||||
// start the first set of informers now so that other controllers can start
|
||||
ctx.InformerFactory.Start(ctx.Stop)
|
||||
|
||||
if ctx.IsControllerEnabled(routeControllerName) {
|
||||
_, clusterCIDR, err := net.ParseCIDR(s.ClusterCIDR)
|
||||
if err != nil {
|
||||
glog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", s.ClusterCIDR, err)
|
||||
}
|
||||
if s.AllocateNodeCIDRs && s.ConfigureCloudRoutes {
|
||||
if cloud == nil {
|
||||
glog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.")
|
||||
} else if routes, ok := cloud.Routes(); !ok {
|
||||
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
|
||||
} else {
|
||||
routeController := routecontroller.New(routes, clientBuilder.ClientOrDie("route-controller"), sharedInformers.Core().V1().Nodes(), s.ClusterName, clusterCIDR)
|
||||
go routeController.Run(stop, s.RouteReconciliationPeriod.Duration)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
} else {
|
||||
glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes)
|
||||
}
|
||||
} else {
|
||||
glog.Warningf("%q is disabled", routeControllerName)
|
||||
}
|
||||
|
||||
if ctx.IsControllerEnabled(pvBinderControllerName) {
|
||||
params := persistentvolumecontroller.ControllerParameters{
|
||||
KubeClient: clientBuilder.ClientOrDie("persistent-volume-binder"),
|
||||
SyncPeriod: s.PVClaimBinderSyncPeriod.Duration,
|
||||
VolumePlugins: ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration),
|
||||
Cloud: cloud,
|
||||
ClusterName: s.ClusterName,
|
||||
VolumeInformer: sharedInformers.Core().V1().PersistentVolumes(),
|
||||
ClaimInformer: sharedInformers.Core().V1().PersistentVolumeClaims(),
|
||||
ClassInformer: sharedInformers.Storage().V1().StorageClasses(),
|
||||
EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning,
|
||||
}
|
||||
volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)
|
||||
if volumeControllerErr != nil {
|
||||
return fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
|
||||
}
|
||||
go volumeController.Run(stop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
} else {
|
||||
glog.Warningf("%q is disabled", pvBinderControllerName)
|
||||
}
|
||||
|
||||
if ctx.IsControllerEnabled(attachDetachControllerName) {
|
||||
if s.ReconcilerSyncLoopPeriod.Duration < time.Second {
|
||||
return fmt.Errorf("Duration time must be greater than one second as set via command line option reconcile-sync-loop-period.")
|
||||
}
|
||||
attachDetachController, attachDetachControllerErr :=
|
||||
attachdetach.NewAttachDetachController(
|
||||
clientBuilder.ClientOrDie("attachdetach-controller"),
|
||||
sharedInformers.Core().V1().Pods(),
|
||||
sharedInformers.Core().V1().Nodes(),
|
||||
sharedInformers.Core().V1().PersistentVolumeClaims(),
|
||||
sharedInformers.Core().V1().PersistentVolumes(),
|
||||
cloud,
|
||||
ProbeAttachableVolumePlugins(s.VolumeConfiguration),
|
||||
s.DisableAttachDetachReconcilerSync,
|
||||
s.ReconcilerSyncLoopPeriod.Duration)
|
||||
if attachDetachControllerErr != nil {
|
||||
return fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr)
|
||||
}
|
||||
go attachDetachController.Run(stop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
} else {
|
||||
glog.Warningf("%q is disabled", attachDetachControllerName)
|
||||
}
|
||||
|
||||
sharedInformers.Start(stop)
|
||||
|
||||
select {}
|
||||
return true, nil
|
||||
}
|
||||
|
|
|
@ -22,10 +22,15 @@ package app
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
|
@ -37,14 +42,140 @@ import (
|
|||
"k8s.io/kubernetes/pkg/controller/garbagecollector"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
|
||||
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
|
||||
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
|
||||
"k8s.io/kubernetes/pkg/controller/podgc"
|
||||
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
|
||||
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
|
||||
routecontroller "k8s.io/kubernetes/pkg/controller/route"
|
||||
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
|
||||
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
|
||||
ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
|
||||
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
quotainstall "k8s.io/kubernetes/pkg/quota/install"
|
||||
)
|
||||
|
||||
func startServiceController(ctx ControllerContext) (bool, error) {
|
||||
serviceController, err := servicecontroller.New(
|
||||
ctx.Cloud,
|
||||
ctx.ClientBuilder.ClientOrDie("service-controller"),
|
||||
ctx.InformerFactory.Core().V1().Services(),
|
||||
ctx.InformerFactory.Core().V1().Nodes(),
|
||||
ctx.Options.ClusterName,
|
||||
)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to start service controller: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
go serviceController.Run(ctx.Stop, int(ctx.Options.ConcurrentServiceSyncs))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func startNodeController(ctx ControllerContext) (bool, error) {
|
||||
_, clusterCIDR, err := net.ParseCIDR(ctx.Options.ClusterCIDR)
|
||||
if err != nil {
|
||||
glog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", ctx.Options.ClusterCIDR, err)
|
||||
}
|
||||
_, serviceCIDR, err := net.ParseCIDR(ctx.Options.ServiceCIDR)
|
||||
if err != nil {
|
||||
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", ctx.Options.ServiceCIDR, err)
|
||||
}
|
||||
nodeController, err := nodecontroller.NewNodeController(
|
||||
ctx.InformerFactory.Core().V1().Pods(),
|
||||
ctx.InformerFactory.Core().V1().Nodes(),
|
||||
ctx.InformerFactory.Extensions().V1beta1().DaemonSets(),
|
||||
ctx.Cloud,
|
||||
ctx.ClientBuilder.ClientOrDie("node-controller"),
|
||||
ctx.Options.PodEvictionTimeout.Duration,
|
||||
ctx.Options.NodeEvictionRate,
|
||||
ctx.Options.SecondaryNodeEvictionRate,
|
||||
ctx.Options.LargeClusterSizeThreshold,
|
||||
ctx.Options.UnhealthyZoneThreshold,
|
||||
ctx.Options.NodeMonitorGracePeriod.Duration,
|
||||
ctx.Options.NodeStartupGracePeriod.Duration,
|
||||
ctx.Options.NodeMonitorPeriod.Duration,
|
||||
clusterCIDR,
|
||||
serviceCIDR,
|
||||
int(ctx.Options.NodeCIDRMaskSize),
|
||||
ctx.Options.AllocateNodeCIDRs,
|
||||
nodecontroller.CIDRAllocatorType(ctx.Options.CIDRAllocatorType),
|
||||
ctx.Options.EnableTaintManager,
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
|
||||
)
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
go nodeController.Run(ctx.Stop)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func startRouteController(ctx ControllerContext) (bool, error) {
|
||||
_, clusterCIDR, err := net.ParseCIDR(ctx.Options.ClusterCIDR)
|
||||
if err != nil {
|
||||
glog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", ctx.Options.ClusterCIDR, err)
|
||||
}
|
||||
// TODO demorgans
|
||||
if ctx.Options.AllocateNodeCIDRs && ctx.Options.ConfigureCloudRoutes {
|
||||
if ctx.Cloud == nil {
|
||||
glog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.")
|
||||
return false, nil
|
||||
} else if routes, ok := ctx.Cloud.Routes(); !ok {
|
||||
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
|
||||
return false, nil
|
||||
} else {
|
||||
routeController := routecontroller.New(routes, ctx.ClientBuilder.ClientOrDie("route-controller"), ctx.InformerFactory.Core().V1().Nodes(), ctx.Options.ClusterName, clusterCIDR)
|
||||
go routeController.Run(ctx.Stop, ctx.Options.RouteReconciliationPeriod.Duration)
|
||||
return true, nil
|
||||
}
|
||||
} else {
|
||||
glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.Options.AllocateNodeCIDRs, ctx.Options.ConfigureCloudRoutes)
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
func startPersistentVolumeBinderController(ctx ControllerContext) (bool, error) {
|
||||
params := persistentvolumecontroller.ControllerParameters{
|
||||
KubeClient: ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"),
|
||||
SyncPeriod: ctx.Options.PVClaimBinderSyncPeriod.Duration,
|
||||
VolumePlugins: ProbeControllerVolumePlugins(ctx.Cloud, ctx.Options.VolumeConfiguration),
|
||||
Cloud: ctx.Cloud,
|
||||
ClusterName: ctx.Options.ClusterName,
|
||||
VolumeInformer: ctx.InformerFactory.Core().V1().PersistentVolumes(),
|
||||
ClaimInformer: ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
ClassInformer: ctx.InformerFactory.Storage().V1().StorageClasses(),
|
||||
EnableDynamicProvisioning: ctx.Options.VolumeConfiguration.EnableDynamicProvisioning,
|
||||
}
|
||||
volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)
|
||||
if volumeControllerErr != nil {
|
||||
return true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
|
||||
}
|
||||
go volumeController.Run(ctx.Stop)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func startAttachDetachController(ctx ControllerContext) (bool, error) {
|
||||
if ctx.Options.ReconcilerSyncLoopPeriod.Duration < time.Second {
|
||||
return true, fmt.Errorf("Duration time must be greater than one second as set via command line option reconcile-sync-loop-period.")
|
||||
}
|
||||
attachDetachController, attachDetachControllerErr :=
|
||||
attachdetach.NewAttachDetachController(
|
||||
ctx.ClientBuilder.ClientOrDie("attachdetach-controller"),
|
||||
ctx.InformerFactory.Core().V1().Pods(),
|
||||
ctx.InformerFactory.Core().V1().Nodes(),
|
||||
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
ctx.InformerFactory.Core().V1().PersistentVolumes(),
|
||||
ctx.Cloud,
|
||||
ProbeAttachableVolumePlugins(ctx.Options.VolumeConfiguration),
|
||||
ctx.Options.DisableAttachDetachReconcilerSync,
|
||||
ctx.Options.ReconcilerSyncLoopPeriod.Duration)
|
||||
if attachDetachControllerErr != nil {
|
||||
return true, fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr)
|
||||
}
|
||||
go attachDetachController.Run(ctx.Stop)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func startEndpointController(ctx ControllerContext) (bool, error) {
|
||||
go endpointcontroller.NewEndpointController(
|
||||
ctx.InformerFactory.Core().V1().Pods(),
|
||||
|
|
Loading…
Reference in New Issue