refactor code logic for cloud-controller manager

pull/564/head
stewart-yu 2019-01-14 19:49:47 +08:00
parent bbd992df13
commit 72729db87f
5 changed files with 180 additions and 107 deletions

View File

@ -19,10 +19,8 @@ package app
import (
"context"
"fmt"
"net"
"net/http"
"os"
"strings"
"time"
"github.com/spf13/cobra"
@ -34,7 +32,6 @@ import (
"k8s.io/apiserver/pkg/server/healthz"
apiserverflag "k8s.io/apiserver/pkg/util/flag"
"k8s.io/apiserver/pkg/util/globalflag"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
cloudprovider "k8s.io/cloud-provider"
@ -43,9 +40,6 @@ import (
"k8s.io/kubernetes/cmd/cloud-controller-manager/app/options"
genericcontrollermanager "k8s.io/kubernetes/cmd/controller-manager/app"
cmoptions "k8s.io/kubernetes/cmd/controller-manager/app/options"
cloudcontrollers "k8s.io/kubernetes/pkg/controller/cloud"
routecontroller "k8s.io/kubernetes/pkg/controller/route"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
"k8s.io/kubernetes/pkg/util/configz"
utilflag "k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/version"
@ -165,7 +159,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error
}
run := func(ctx context.Context) {
if err := startControllers(c, ctx.Done(), cloud); err != nil {
if err := startControllers(c, ctx.Done(), cloud, newControllerInitializers()); err != nil {
klog.Fatalf("error running controllers: %v", err)
}
}
@ -215,87 +209,40 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error
}
// startControllers starts the cloud specific controller loops.
func startControllers(c *cloudcontrollerconfig.CompletedConfig, stop <-chan struct{}, cloud cloudprovider.Interface) error {
// Function to build the kube client object
client := func(serviceAccountName string) kubernetes.Interface {
return c.ClientBuilder.ClientOrDie(serviceAccountName)
}
func startControllers(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}, cloud cloudprovider.Interface, controllers map[string]initFunc) error {
if cloud != nil {
// Initialize the cloud provider with a reference to the clientBuilder
cloud.Initialize(c.ClientBuilder, stop)
}
// Start the CloudNodeController
nodeController := cloudcontrollers.NewCloudNodeController(
c.SharedInformers.Core().V1().Nodes(),
// cloud node controller uses existing cluster role from node-controller
client("node-controller"), cloud,
c.ComponentConfig.NodeStatusUpdateFrequency.Duration)
go nodeController.Run(stop)
time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
cloudNodeLifecycleController, err := cloudcontrollers.NewCloudNodeLifecycleController(
c.SharedInformers.Core().V1().Nodes(),
// cloud node lifecycle controller uses existing cluster role from node-controller
client("node-controller"), cloud,
c.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
)
if err != nil {
klog.Errorf("failed to start cloud node lifecycle controller: %s", err)
} else {
go cloudNodeLifecycleController.Run(stop)
time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
cloud.Initialize(c.ClientBuilder, stopCh)
}
// Start the PersistentVolumeLabelController
pvlController := cloudcontrollers.NewPersistentVolumeLabelController(client("pvl-controller"), cloud)
go pvlController.Run(5, stop)
time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
// Start the service controller
serviceController, err := servicecontroller.New(
cloud,
client("service-controller"),
c.SharedInformers.Core().V1().Services(),
c.SharedInformers.Core().V1().Nodes(),
c.ComponentConfig.KubeCloudShared.ClusterName,
)
if err != nil {
klog.Errorf("Failed to start service controller: %v", err)
} else {
go serviceController.Run(stop, int(c.ComponentConfig.ServiceController.ConcurrentServiceSyncs))
time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
}
// If CIDRs should be allocated for pods and set on the CloudProvider, then start the route controller
if c.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs && c.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
if routes, ok := cloud.Routes(); !ok {
klog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
} else {
var clusterCIDR *net.IPNet
if len(strings.TrimSpace(c.ComponentConfig.KubeCloudShared.ClusterCIDR)) != 0 {
_, clusterCIDR, err = net.ParseCIDR(c.ComponentConfig.KubeCloudShared.ClusterCIDR)
if err != nil {
klog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", c.ComponentConfig.KubeCloudShared.ClusterCIDR, err)
}
}
routeController := routecontroller.New(routes, client("route-controller"), c.SharedInformers.Core().V1().Nodes(), c.ComponentConfig.KubeCloudShared.ClusterName, clusterCIDR)
go routeController.Run(stop, c.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration)
time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
for controllerName, initFn := range controllers {
if !genericcontrollermanager.IsControllerEnabled(controllerName, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers) {
klog.Warningf("%q is disabled", controllerName)
continue
}
} else {
klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", c.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, c.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
klog.V(1).Infof("Starting %q", controllerName)
_, started, err := initFn(c, cloud, stopCh)
if err != nil {
klog.Errorf("Error starting %q", controllerName)
return err
}
if !started {
klog.Warningf("Skipping %q", controllerName)
continue
}
klog.Infof("Started %q", controllerName)
time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
}
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
err = genericcontrollermanager.WaitForAPIServer(c.VersionedClient, 10*time.Second)
if err != nil {
if err := genericcontrollermanager.WaitForAPIServer(c.VersionedClient, 10*time.Second); err != nil {
klog.Fatalf("Failed to wait for apiserver being healthy: %v", err)
}
c.SharedInformers.Start(stop)
c.SharedInformers.Start(stopCh)
select {}
}

View File

@ -0,0 +1,131 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app implements a server that runs a set of active
// components. This includes node controllers, service and
// route controller, and so on.
//
package app
import (
"net"
"net/http"
"strings"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
cloudcontrollerconfig "k8s.io/kubernetes/cmd/cloud-controller-manager/app/config"
cloudcontrollers "k8s.io/kubernetes/pkg/controller/cloud"
routecontroller "k8s.io/kubernetes/pkg/controller/route"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
)
func startCloudNodeController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
// Start the CloudNodeController
nodeController := cloudcontrollers.NewCloudNodeController(
ctx.SharedInformers.Core().V1().Nodes(),
// cloud node controller uses existing cluster role from node-controller
ctx.ClientBuilder.ClientOrDie("node-controller"),
cloud,
ctx.ComponentConfig.NodeStatusUpdateFrequency.Duration)
go nodeController.Run(stopCh)
return nil, true, nil
}
func startCloudNodeLifecycleController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
// Start the cloudNodeLifecycleController
cloudNodeLifecycleController, err := cloudcontrollers.NewCloudNodeLifecycleController(
ctx.SharedInformers.Core().V1().Nodes(),
// cloud node lifecycle controller uses existing cluster role from node-controller
ctx.ClientBuilder.ClientOrDie("node-controller"),
cloud,
ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
)
if err != nil {
klog.Warningf("failed to start cloud node lifecycle controller: %s", err)
return nil, false, nil
}
go cloudNodeLifecycleController.Run(stopCh)
return nil, true, nil
}
func startPersistentVolumeLabelController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
// Start the PersistentVolumeLabelController
pvlController := cloudcontrollers.NewPersistentVolumeLabelController(
ctx.ClientBuilder.ClientOrDie("pvl-controller"),
cloud,
)
go pvlController.Run(5, stopCh)
return nil, true, nil
}
func startServiceController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
// Start the service controller
serviceController, err := servicecontroller.New(
cloud,
ctx.ClientBuilder.ClientOrDie("service-controller"),
ctx.SharedInformers.Core().V1().Services(),
ctx.SharedInformers.Core().V1().Nodes(),
ctx.ComponentConfig.KubeCloudShared.ClusterName,
)
if err != nil {
// This error shouldn't fail. It lives like this as a legacy.
klog.Errorf("Failed to start service controller: %v", err)
return nil, false, nil
}
go serviceController.Run(stopCh, int(ctx.ComponentConfig.ServiceController.ConcurrentServiceSyncs))
return nil, true, nil
}
func startRouteController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
return nil, false, nil
}
// If CIDRs should be allocated for pods and set on the CloudProvider, then start the route controller
routes, ok := cloud.Routes()
if !ok {
klog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
return nil, false, nil
}
var clusterCIDR *net.IPNet
var err error
if len(strings.TrimSpace(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR)) != 0 {
_, clusterCIDR, err = net.ParseCIDR(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR)
if err != nil {
klog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", ctx.ComponentConfig.KubeCloudShared.ClusterCIDR, err)
}
}
routeController := routecontroller.New(
routes,
ctx.ClientBuilder.ClientOrDie("route-controller"),
ctx.SharedInformers.Core().V1().Nodes(),
ctx.ComponentConfig.KubeCloudShared.ClusterName,
clusterCIDR,
)
go routeController.Run(stopCh, ctx.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration)
return nil, true, nil
}

View File

@ -21,6 +21,7 @@ import (
"net/http"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog"
@ -53,3 +54,26 @@ func WaitForAPIServer(client clientset.Interface, timeout time.Duration) error {
return nil
}
// IsControllerEnabled check if a specified controller enabled or not.
func IsControllerEnabled(name string, disabledByDefaultControllers sets.String, controllers []string) bool {
hasStar := false
for _, ctrl := range controllers {
if ctrl == name {
return true
}
if ctrl == "-"+name {
return false
}
if ctrl == "*" {
hasStar = true
}
}
// if we get here, there was no explicit choice
if !hasStar {
// nothing on by default
return false
}
return !disabledByDefaultControllers.Has(name)
}

View File

@ -14,10 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package app implements a server that runs a set of active
// components. This includes replication controllers, service endpoints and
// nodes.
//
package app
import (
@ -74,7 +70,7 @@ func TestIsControllerEnabled(t *testing.T) {
}
for _, tc := range tcs {
actual := IsControllerEnabled(tc.controllerName, sets.NewString(tc.disabledByDefaultControllers...), tc.controllers...)
actual := IsControllerEnabled(tc.controllerName, sets.NewString(tc.disabledByDefaultControllers...), tc.controllers)
assert.Equal(t, tc.expected, actual, "%v: expected %v, got %v", tc.name, tc.expected, actual)
}

View File

@ -301,32 +301,7 @@ type ControllerContext struct {
}
func (c ControllerContext) IsControllerEnabled(name string) bool {
return IsControllerEnabled(name, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers...)
}
func IsControllerEnabled(name string, disabledByDefaultControllers sets.String, controllers ...string) bool {
hasStar := false
for _, ctrl := range controllers {
if ctrl == name {
return true
}
if ctrl == "-"+name {
return false
}
if ctrl == "*" {
hasStar = true
}
}
// if we get here, there was no explicit choice
if !hasStar {
// nothing on by default
return false
}
if disabledByDefaultControllers.Has(name) {
return false
}
return true
return genericcontrollermanager.IsControllerEnabled(name, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers)
}
// InitFunc is used to launch a particular controller. It may run additional "should I activate checks".