mirror of https://github.com/k3s-io/k3s
Delete route controller
parent
9422d1d788
commit
9b2f75c001
|
@ -274,6 +274,5 @@ func newControllerInitializers() map[string]initFunc {
|
|||
controllers["cloud-node"] = startCloudNodeController
|
||||
controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
|
||||
controllers["service"] = startServiceController
|
||||
controllers["route"] = startRouteController
|
||||
return controllers
|
||||
}
|
||||
|
|
|
@ -21,16 +21,12 @@ limitations under the License.
|
|||
package app
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
cloudprovider "k8s.io/cloud-provider"
|
||||
"k8s.io/cloud-provider"
|
||||
"k8s.io/klog"
|
||||
cloudcontrollerconfig "k8s.io/kubernetes/cmd/cloud-controller-manager/app/config"
|
||||
cloudcontrollers "k8s.io/kubernetes/pkg/controller/cloud"
|
||||
routecontroller "k8s.io/kubernetes/pkg/controller/route"
|
||||
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func startCloudNodeController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
|
||||
|
@ -85,36 +81,3 @@ func startServiceController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cl
|
|||
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startRouteController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
|
||||
if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
|
||||
klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
// If CIDRs should be allocated for pods and set on the CloudProvider, then start the route controller
|
||||
routes, ok := cloud.Routes()
|
||||
if !ok {
|
||||
klog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
|
||||
return nil, false, nil
|
||||
}
|
||||
var clusterCIDR *net.IPNet
|
||||
var err error
|
||||
if len(strings.TrimSpace(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR)) != 0 {
|
||||
_, clusterCIDR, err = net.ParseCIDR(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR)
|
||||
if err != nil {
|
||||
klog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", ctx.ComponentConfig.KubeCloudShared.ClusterCIDR, err)
|
||||
}
|
||||
}
|
||||
|
||||
routeController := routecontroller.New(
|
||||
routes,
|
||||
ctx.ClientBuilder.ClientOrDie("route-controller"),
|
||||
ctx.SharedInformers.Core().V1().Nodes(),
|
||||
ctx.ComponentConfig.KubeCloudShared.ClusterName,
|
||||
clusterCIDR,
|
||||
)
|
||||
go routeController.Run(stopCh, ctx.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration)
|
||||
|
||||
return nil, true, nil
|
||||
}
|
||||
|
|
|
@ -367,7 +367,6 @@ func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc
|
|||
controllers["nodelifecycle"] = startNodeLifecycleController
|
||||
if loopMode == IncludeCloudLoops {
|
||||
controllers["service"] = startServiceController
|
||||
controllers["route"] = startRouteController
|
||||
controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
|
||||
// TODO: volume controller into the IncludeCloudLoops only set.
|
||||
}
|
||||
|
|
|
@ -48,7 +48,6 @@ import (
|
|||
"k8s.io/kubernetes/pkg/controller/podgc"
|
||||
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
|
||||
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
|
||||
routecontroller "k8s.io/kubernetes/pkg/controller/route"
|
||||
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
|
||||
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
|
||||
ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl"
|
||||
|
@ -166,29 +165,6 @@ func startCloudNodeLifecycleController(ctx ControllerContext) (http.Handler, boo
|
|||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startRouteController(ctx ControllerContext) (http.Handler, bool, error) {
|
||||
if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
|
||||
klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
|
||||
return nil, false, nil
|
||||
}
|
||||
if ctx.Cloud == nil {
|
||||
klog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.")
|
||||
return nil, false, nil
|
||||
}
|
||||
routes, ok := ctx.Cloud.Routes()
|
||||
if !ok {
|
||||
klog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
|
||||
return nil, false, nil
|
||||
}
|
||||
_, clusterCIDR, err := net.ParseCIDR(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR)
|
||||
if err != nil {
|
||||
klog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", ctx.ComponentConfig.KubeCloudShared.ClusterCIDR, err)
|
||||
}
|
||||
routeController := routecontroller.New(routes, ctx.ClientBuilder.ClientOrDie("route-controller"), ctx.InformerFactory.Core().V1().Nodes(), ctx.ComponentConfig.KubeCloudShared.ClusterName, clusterCIDR)
|
||||
go routeController.Run(ctx.Stop, ctx.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration)
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
func startPersistentVolumeBinderController(ctx ControllerContext) (http.Handler, bool, error) {
|
||||
params := persistentvolumecontroller.ControllerParameters{
|
||||
KubeClient: ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"),
|
||||
|
|
|
@ -1,69 +0,0 @@
|
|||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"doc.go",
|
||||
"route_controller.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/controller/route",
|
||||
deps = [
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/util/node:go_default_library",
|
||||
"//pkg/util/metrics:go_default_library",
|
||||
"//pkg/util/node:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
|
||||
"//staging/src/k8s.io/cloud-provider:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["route_controller_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/cloudprovider/providers/fake:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/util/node:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
||||
"//staging/src/k8s.io/cloud-provider:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
|
@ -1,13 +0,0 @@
|
|||
# See the OWNERS docs at https://go.k8s.io/owners
|
||||
|
||||
approvers:
|
||||
- gmarek
|
||||
- wojtek-t
|
||||
- bowei
|
||||
- andrewsykim
|
||||
- cheftako
|
||||
reviewers:
|
||||
- gmarek
|
||||
- wojtek-t
|
||||
- andrewsykim
|
||||
- cheftako
|
|
@ -1,19 +0,0 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package route contains code for syncing cloud routing rules with
|
||||
// the list of registered nodes.
|
||||
package route // import "k8s.io/kubernetes/pkg/controller/route"
|
|
@ -1,284 +0,0 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package route
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
clientretry "k8s.io/client-go/util/retry"
|
||||
cloudprovider "k8s.io/cloud-provider"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
|
||||
"k8s.io/kubernetes/pkg/util/metrics"
|
||||
utilnode "k8s.io/kubernetes/pkg/util/node"
|
||||
)
|
||||
|
||||
const (
|
||||
// Maximal number of concurrent CreateRoute API calls.
|
||||
// TODO: This should be per-provider.
|
||||
maxConcurrentRouteCreations int = 200
|
||||
)
|
||||
|
||||
var updateNetworkConditionBackoff = wait.Backoff{
|
||||
Steps: 5, // Maximum number of retries.
|
||||
Duration: 100 * time.Millisecond,
|
||||
Jitter: 1.0,
|
||||
}
|
||||
|
||||
type RouteController struct {
|
||||
routes cloudprovider.Routes
|
||||
kubeClient clientset.Interface
|
||||
clusterName string
|
||||
clusterCIDR *net.IPNet
|
||||
nodeLister corelisters.NodeLister
|
||||
nodeListerSynced cache.InformerSynced
|
||||
broadcaster record.EventBroadcaster
|
||||
recorder record.EventRecorder
|
||||
}
|
||||
|
||||
func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInformer coreinformers.NodeInformer, clusterName string, clusterCIDR *net.IPNet) *RouteController {
|
||||
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
|
||||
metrics.RegisterMetricAndTrackRateLimiterUsage("route_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
|
||||
}
|
||||
|
||||
if clusterCIDR == nil {
|
||||
klog.Fatal("RouteController: Must specify clusterCIDR.")
|
||||
}
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(klog.Infof)
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "route_controller"})
|
||||
|
||||
rc := &RouteController{
|
||||
routes: routes,
|
||||
kubeClient: kubeClient,
|
||||
clusterName: clusterName,
|
||||
clusterCIDR: clusterCIDR,
|
||||
nodeLister: nodeInformer.Lister(),
|
||||
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
||||
broadcaster: eventBroadcaster,
|
||||
recorder: recorder,
|
||||
}
|
||||
|
||||
return rc
|
||||
}
|
||||
|
||||
func (rc *RouteController) Run(stopCh <-chan struct{}, syncPeriod time.Duration) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
klog.Info("Starting route controller")
|
||||
defer klog.Info("Shutting down route controller")
|
||||
|
||||
if !controller.WaitForCacheSync("route", stopCh, rc.nodeListerSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
if rc.broadcaster != nil {
|
||||
rc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: rc.kubeClient.CoreV1().Events("")})
|
||||
}
|
||||
|
||||
// TODO: If we do just the full Resync every 5 minutes (default value)
|
||||
// that means that we may wait up to 5 minutes before even starting
|
||||
// creating a route for it. This is bad.
|
||||
// We should have a watch on node and if we observe a new node (with CIDR?)
|
||||
// trigger reconciliation for that node.
|
||||
go wait.NonSlidingUntil(func() {
|
||||
if err := rc.reconcileNodeRoutes(); err != nil {
|
||||
klog.Errorf("Couldn't reconcile node routes: %v", err)
|
||||
}
|
||||
}, syncPeriod, stopCh)
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (rc *RouteController) reconcileNodeRoutes() error {
|
||||
routeList, err := rc.routes.ListRoutes(context.TODO(), rc.clusterName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error listing routes: %v", err)
|
||||
}
|
||||
nodes, err := rc.nodeLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error listing nodes: %v", err)
|
||||
}
|
||||
return rc.reconcile(nodes, routeList)
|
||||
}
|
||||
|
||||
func (rc *RouteController) reconcile(nodes []*v1.Node, routes []*cloudprovider.Route) error {
|
||||
// nodeCIDRs maps nodeName->nodeCIDR
|
||||
nodeCIDRs := make(map[types.NodeName]string)
|
||||
// routeMap maps routeTargetNode->route
|
||||
routeMap := make(map[types.NodeName]*cloudprovider.Route)
|
||||
for _, route := range routes {
|
||||
if route.TargetNode != "" {
|
||||
routeMap[route.TargetNode] = route
|
||||
}
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
rateLimiter := make(chan struct{}, maxConcurrentRouteCreations)
|
||||
|
||||
for _, node := range nodes {
|
||||
// Skip if the node hasn't been assigned a CIDR yet.
|
||||
if node.Spec.PodCIDR == "" {
|
||||
continue
|
||||
}
|
||||
nodeName := types.NodeName(node.Name)
|
||||
// Check if we have a route for this node w/ the correct CIDR.
|
||||
r := routeMap[nodeName]
|
||||
if r == nil || r.DestinationCIDR != node.Spec.PodCIDR {
|
||||
// If not, create the route.
|
||||
route := &cloudprovider.Route{
|
||||
TargetNode: nodeName,
|
||||
DestinationCIDR: node.Spec.PodCIDR,
|
||||
}
|
||||
nameHint := string(node.UID)
|
||||
wg.Add(1)
|
||||
go func(nodeName types.NodeName, nameHint string, route *cloudprovider.Route) {
|
||||
defer wg.Done()
|
||||
err := clientretry.RetryOnConflict(updateNetworkConditionBackoff, func() error {
|
||||
startTime := time.Now()
|
||||
// Ensure that we don't have more than maxConcurrentRouteCreations
|
||||
// CreateRoute calls in flight.
|
||||
rateLimiter <- struct{}{}
|
||||
klog.Infof("Creating route for node %s %s with hint %s, throttled %v", nodeName, route.DestinationCIDR, nameHint, time.Since(startTime))
|
||||
err := rc.routes.CreateRoute(context.TODO(), rc.clusterName, nameHint, route)
|
||||
<-rateLimiter
|
||||
|
||||
rc.updateNetworkingCondition(nodeName, err == nil)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("Could not create route %s %s for node %s after %v: %v", nameHint, route.DestinationCIDR, nodeName, time.Since(startTime), err)
|
||||
if rc.recorder != nil {
|
||||
rc.recorder.Eventf(
|
||||
&v1.ObjectReference{
|
||||
Kind: "Node",
|
||||
Name: string(nodeName),
|
||||
UID: types.UID(nodeName),
|
||||
Namespace: "",
|
||||
}, v1.EventTypeWarning, "FailedToCreateRoute", msg)
|
||||
}
|
||||
klog.V(4).Infof(msg)
|
||||
return err
|
||||
}
|
||||
klog.Infof("Created route for node %s %s with hint %s after %v", nodeName, route.DestinationCIDR, nameHint, time.Now().Sub(startTime))
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("Could not create route %s %s for node %s: %v", nameHint, route.DestinationCIDR, nodeName, err)
|
||||
}
|
||||
}(nodeName, nameHint, route)
|
||||
} else {
|
||||
// Update condition only if it doesn't reflect the current state.
|
||||
_, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeNetworkUnavailable)
|
||||
if condition == nil || condition.Status != v1.ConditionFalse {
|
||||
rc.updateNetworkingCondition(types.NodeName(node.Name), true)
|
||||
}
|
||||
}
|
||||
nodeCIDRs[nodeName] = node.Spec.PodCIDR
|
||||
}
|
||||
for _, route := range routes {
|
||||
if rc.isResponsibleForRoute(route) {
|
||||
// Check if this route is a blackhole, or applies to a node we know about & has an incorrect CIDR.
|
||||
if route.Blackhole || (nodeCIDRs[route.TargetNode] != route.DestinationCIDR) {
|
||||
wg.Add(1)
|
||||
// Delete the route.
|
||||
go func(route *cloudprovider.Route, startTime time.Time) {
|
||||
defer wg.Done()
|
||||
klog.Infof("Deleting route %s %s", route.Name, route.DestinationCIDR)
|
||||
if err := rc.routes.DeleteRoute(context.TODO(), rc.clusterName, route); err != nil {
|
||||
klog.Errorf("Could not delete route %s %s after %v: %v", route.Name, route.DestinationCIDR, time.Since(startTime), err)
|
||||
} else {
|
||||
klog.Infof("Deleted route %s %s after %v", route.Name, route.DestinationCIDR, time.Since(startTime))
|
||||
}
|
||||
}(route, time.Now())
|
||||
}
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rc *RouteController) updateNetworkingCondition(nodeName types.NodeName, routeCreated bool) error {
|
||||
err := clientretry.RetryOnConflict(updateNetworkConditionBackoff, func() error {
|
||||
var err error
|
||||
// Patch could also fail, even though the chance is very slim. So we still do
|
||||
// patch in the retry loop.
|
||||
currentTime := metav1.Now()
|
||||
if routeCreated {
|
||||
err = utilnode.SetNodeCondition(rc.kubeClient, nodeName, v1.NodeCondition{
|
||||
Type: v1.NodeNetworkUnavailable,
|
||||
Status: v1.ConditionFalse,
|
||||
Reason: "RouteCreated",
|
||||
Message: "RouteController created a route",
|
||||
LastTransitionTime: currentTime,
|
||||
})
|
||||
} else {
|
||||
err = utilnode.SetNodeCondition(rc.kubeClient, nodeName, v1.NodeCondition{
|
||||
Type: v1.NodeNetworkUnavailable,
|
||||
Status: v1.ConditionTrue,
|
||||
Reason: "NoRouteCreated",
|
||||
Message: "RouteController failed to create a route",
|
||||
LastTransitionTime: currentTime,
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
klog.V(4).Infof("Error updating node %s, retrying: %v", nodeName, err)
|
||||
}
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
klog.Errorf("Error updating node %s: %v", nodeName, err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (rc *RouteController) isResponsibleForRoute(route *cloudprovider.Route) bool {
|
||||
_, cidr, err := net.ParseCIDR(route.DestinationCIDR)
|
||||
if err != nil {
|
||||
klog.Errorf("Ignoring route %s, unparsable CIDR: %v", route.Name, err)
|
||||
return false
|
||||
}
|
||||
// Not responsible if this route's CIDR is not within our clusterCIDR
|
||||
lastIP := make([]byte, len(cidr.IP))
|
||||
for i := range lastIP {
|
||||
lastIP[i] = cidr.IP[i] | ^cidr.Mask[i]
|
||||
}
|
||||
if !rc.clusterCIDR.Contains(cidr.IP) || !rc.clusterCIDR.Contains(lastIP) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
|
@ -1,316 +0,0 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package route
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
cloudprovider "k8s.io/cloud-provider"
|
||||
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
|
||||
)
|
||||
|
||||
func alwaysReady() bool { return true }
|
||||
|
||||
func TestIsResponsibleForRoute(t *testing.T) {
|
||||
myClusterName := "my-awesome-cluster"
|
||||
myClusterRoute := "my-awesome-cluster-12345678-90ab-cdef-1234-567890abcdef"
|
||||
testCases := []struct {
|
||||
clusterCIDR string
|
||||
routeName string
|
||||
routeCIDR string
|
||||
expectedResponsible bool
|
||||
}{
|
||||
// Routes that belong to this cluster
|
||||
{"10.244.0.0/16", myClusterRoute, "10.244.0.0/24", true},
|
||||
{"10.244.0.0/16", myClusterRoute, "10.244.10.0/24", true},
|
||||
{"10.244.0.0/16", myClusterRoute, "10.244.255.0/24", true},
|
||||
{"10.244.0.0/14", myClusterRoute, "10.244.0.0/24", true},
|
||||
{"10.244.0.0/14", myClusterRoute, "10.247.255.0/24", true},
|
||||
// Routes that match our naming/tagging scheme, but are outside our cidr
|
||||
{"10.244.0.0/16", myClusterRoute, "10.224.0.0/24", false},
|
||||
{"10.244.0.0/16", myClusterRoute, "10.0.10.0/24", false},
|
||||
{"10.244.0.0/16", myClusterRoute, "10.255.255.0/24", false},
|
||||
{"10.244.0.0/14", myClusterRoute, "10.248.0.0/24", false},
|
||||
{"10.244.0.0/14", myClusterRoute, "10.243.255.0/24", false},
|
||||
}
|
||||
for i, testCase := range testCases {
|
||||
_, cidr, err := net.ParseCIDR(testCase.clusterCIDR)
|
||||
if err != nil {
|
||||
t.Errorf("%d. Error in test case: unparsable cidr %q", i, testCase.clusterCIDR)
|
||||
}
|
||||
client := fake.NewSimpleClientset()
|
||||
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||
rc := New(nil, nil, informerFactory.Core().V1().Nodes(), myClusterName, cidr)
|
||||
rc.nodeListerSynced = alwaysReady
|
||||
route := &cloudprovider.Route{
|
||||
Name: testCase.routeName,
|
||||
TargetNode: types.NodeName("doesnt-matter-for-this-test"),
|
||||
DestinationCIDR: testCase.routeCIDR,
|
||||
}
|
||||
if resp := rc.isResponsibleForRoute(route); resp != testCase.expectedResponsible {
|
||||
t.Errorf("%d. isResponsibleForRoute() = %t; want %t", i, resp, testCase.expectedResponsible)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestReconcile(t *testing.T) {
|
||||
cluster := "my-k8s"
|
||||
node1 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1", UID: "01"}, Spec: v1.NodeSpec{PodCIDR: "10.120.0.0/24"}}
|
||||
node2 := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2", UID: "02"}, Spec: v1.NodeSpec{PodCIDR: "10.120.1.0/24"}}
|
||||
nodeNoCidr := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2", UID: "02"}, Spec: v1.NodeSpec{PodCIDR: ""}}
|
||||
|
||||
testCases := []struct {
|
||||
nodes []*v1.Node
|
||||
initialRoutes []*cloudprovider.Route
|
||||
expectedRoutes []*cloudprovider.Route
|
||||
expectedNetworkUnavailable []bool
|
||||
clientset *fake.Clientset
|
||||
}{
|
||||
// 2 nodes, routes already there
|
||||
{
|
||||
nodes: []*v1.Node{
|
||||
&node1,
|
||||
&node2,
|
||||
},
|
||||
initialRoutes: []*cloudprovider.Route{
|
||||
{Name: cluster + "-01", TargetNode: "node-1", DestinationCIDR: "10.120.0.0/24", Blackhole: false},
|
||||
{Name: cluster + "-02", TargetNode: "node-2", DestinationCIDR: "10.120.1.0/24", Blackhole: false},
|
||||
},
|
||||
expectedRoutes: []*cloudprovider.Route{
|
||||
{Name: cluster + "-01", TargetNode: "node-1", DestinationCIDR: "10.120.0.0/24", Blackhole: false},
|
||||
{Name: cluster + "-02", TargetNode: "node-2", DestinationCIDR: "10.120.1.0/24", Blackhole: false},
|
||||
},
|
||||
expectedNetworkUnavailable: []bool{true, true},
|
||||
clientset: fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{node1, node2}}),
|
||||
},
|
||||
// 2 nodes, one route already there
|
||||
{
|
||||
nodes: []*v1.Node{
|
||||
&node1,
|
||||
&node2,
|
||||
},
|
||||
initialRoutes: []*cloudprovider.Route{
|
||||
{Name: cluster + "-01", TargetNode: "node-1", DestinationCIDR: "10.120.0.0/24", Blackhole: false},
|
||||
},
|
||||
expectedRoutes: []*cloudprovider.Route{
|
||||
{Name: cluster + "-01", TargetNode: "node-1", DestinationCIDR: "10.120.0.0/24", Blackhole: false},
|
||||
{Name: cluster + "-02", TargetNode: "node-2", DestinationCIDR: "10.120.1.0/24", Blackhole: false},
|
||||
},
|
||||
expectedNetworkUnavailable: []bool{true, true},
|
||||
clientset: fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{node1, node2}}),
|
||||
},
|
||||
// 2 nodes, no routes yet
|
||||
{
|
||||
nodes: []*v1.Node{
|
||||
&node1,
|
||||
&node2,
|
||||
},
|
||||
initialRoutes: []*cloudprovider.Route{},
|
||||
expectedRoutes: []*cloudprovider.Route{
|
||||
{Name: cluster + "-01", TargetNode: "node-1", DestinationCIDR: "10.120.0.0/24", Blackhole: false},
|
||||
{Name: cluster + "-02", TargetNode: "node-2", DestinationCIDR: "10.120.1.0/24", Blackhole: false},
|
||||
},
|
||||
expectedNetworkUnavailable: []bool{true, true},
|
||||
clientset: fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{node1, node2}}),
|
||||
},
|
||||
// 2 nodes, a few too many routes
|
||||
{
|
||||
nodes: []*v1.Node{
|
||||
&node1,
|
||||
&node2,
|
||||
},
|
||||
initialRoutes: []*cloudprovider.Route{
|
||||
{Name: cluster + "-01", TargetNode: "node-1", DestinationCIDR: "10.120.0.0/24", Blackhole: false},
|
||||
{Name: cluster + "-02", TargetNode: "node-2", DestinationCIDR: "10.120.1.0/24", Blackhole: false},
|
||||
{Name: cluster + "-03", TargetNode: "node-3", DestinationCIDR: "10.120.2.0/24", Blackhole: false},
|
||||
{Name: cluster + "-04", TargetNode: "node-4", DestinationCIDR: "10.120.3.0/24", Blackhole: false},
|
||||
},
|
||||
expectedRoutes: []*cloudprovider.Route{
|
||||
{Name: cluster + "-01", TargetNode: "node-1", DestinationCIDR: "10.120.0.0/24", Blackhole: false},
|
||||
{Name: cluster + "-02", TargetNode: "node-2", DestinationCIDR: "10.120.1.0/24", Blackhole: false},
|
||||
},
|
||||
expectedNetworkUnavailable: []bool{true, true},
|
||||
clientset: fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{node1, node2}}),
|
||||
},
|
||||
// 2 nodes, 2 routes, but only 1 is right
|
||||
{
|
||||
nodes: []*v1.Node{
|
||||
&node1,
|
||||
&node2,
|
||||
},
|
||||
initialRoutes: []*cloudprovider.Route{
|
||||
{Name: cluster + "-01", TargetNode: "node-1", DestinationCIDR: "10.120.0.0/24", Blackhole: false},
|
||||
{Name: cluster + "-03", TargetNode: "node-3", DestinationCIDR: "10.120.2.0/24", Blackhole: false},
|
||||
},
|
||||
expectedRoutes: []*cloudprovider.Route{
|
||||
{Name: cluster + "-01", TargetNode: "node-1", DestinationCIDR: "10.120.0.0/24", Blackhole: false},
|
||||
{Name: cluster + "-02", TargetNode: "node-2", DestinationCIDR: "10.120.1.0/24", Blackhole: false},
|
||||
},
|
||||
expectedNetworkUnavailable: []bool{true, true},
|
||||
clientset: fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{node1, node2}}),
|
||||
},
|
||||
// 2 nodes, one node without CIDR assigned.
|
||||
{
|
||||
nodes: []*v1.Node{
|
||||
&node1,
|
||||
&nodeNoCidr,
|
||||
},
|
||||
initialRoutes: []*cloudprovider.Route{},
|
||||
expectedRoutes: []*cloudprovider.Route{
|
||||
{Name: cluster + "-01", TargetNode: "node-1", DestinationCIDR: "10.120.0.0/24", Blackhole: false},
|
||||
},
|
||||
expectedNetworkUnavailable: []bool{true, false},
|
||||
clientset: fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{node1, nodeNoCidr}}),
|
||||
},
|
||||
// 2 nodes, an extra blackhole route in our range
|
||||
{
|
||||
nodes: []*v1.Node{
|
||||
&node1,
|
||||
&node2,
|
||||
},
|
||||
initialRoutes: []*cloudprovider.Route{
|
||||
{Name: cluster + "-01", TargetNode: "node-1", DestinationCIDR: "10.120.0.0/24", Blackhole: false},
|
||||
{Name: cluster + "-02", TargetNode: "node-2", DestinationCIDR: "10.120.1.0/24", Blackhole: false},
|
||||
{Name: cluster + "-03", TargetNode: "", DestinationCIDR: "10.120.2.0/24", Blackhole: true},
|
||||
},
|
||||
expectedRoutes: []*cloudprovider.Route{
|
||||
{Name: cluster + "-01", TargetNode: "node-1", DestinationCIDR: "10.120.0.0/24", Blackhole: false},
|
||||
{Name: cluster + "-02", TargetNode: "node-2", DestinationCIDR: "10.120.1.0/24", Blackhole: false},
|
||||
},
|
||||
expectedNetworkUnavailable: []bool{true, true},
|
||||
clientset: fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{node1, node2}}),
|
||||
},
|
||||
// 2 nodes, an extra blackhole route not in our range
|
||||
{
|
||||
nodes: []*v1.Node{
|
||||
&node1,
|
||||
&node2,
|
||||
},
|
||||
initialRoutes: []*cloudprovider.Route{
|
||||
{Name: cluster + "-01", TargetNode: "node-1", DestinationCIDR: "10.120.0.0/24", Blackhole: false},
|
||||
{Name: cluster + "-02", TargetNode: "node-2", DestinationCIDR: "10.120.1.0/24", Blackhole: false},
|
||||
{Name: cluster + "-03", TargetNode: "", DestinationCIDR: "10.1.2.0/24", Blackhole: true},
|
||||
},
|
||||
expectedRoutes: []*cloudprovider.Route{
|
||||
{Name: cluster + "-01", TargetNode: "node-1", DestinationCIDR: "10.120.0.0/24", Blackhole: false},
|
||||
{Name: cluster + "-02", TargetNode: "node-2", DestinationCIDR: "10.120.1.0/24", Blackhole: false},
|
||||
{Name: cluster + "-03", TargetNode: "", DestinationCIDR: "10.1.2.0/24", Blackhole: true},
|
||||
},
|
||||
expectedNetworkUnavailable: []bool{true, true},
|
||||
clientset: fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{node1, node2}}),
|
||||
},
|
||||
}
|
||||
for i, testCase := range testCases {
|
||||
cloud := &fakecloud.FakeCloud{RouteMap: make(map[string]*fakecloud.FakeRoute)}
|
||||
for _, route := range testCase.initialRoutes {
|
||||
fakeRoute := &fakecloud.FakeRoute{}
|
||||
fakeRoute.ClusterName = cluster
|
||||
fakeRoute.Route = *route
|
||||
cloud.RouteMap[route.Name] = fakeRoute
|
||||
}
|
||||
routes, ok := cloud.Routes()
|
||||
if !ok {
|
||||
t.Error("Error in test: fakecloud doesn't support Routes()")
|
||||
}
|
||||
_, cidr, _ := net.ParseCIDR("10.120.0.0/16")
|
||||
informerFactory := informers.NewSharedInformerFactory(testCase.clientset, controller.NoResyncPeriodFunc())
|
||||
rc := New(routes, testCase.clientset, informerFactory.Core().V1().Nodes(), cluster, cidr)
|
||||
rc.nodeListerSynced = alwaysReady
|
||||
if err := rc.reconcile(testCase.nodes, testCase.initialRoutes); err != nil {
|
||||
t.Errorf("%d. Error from rc.reconcile(): %v", i, err)
|
||||
}
|
||||
for _, action := range testCase.clientset.Actions() {
|
||||
if action.GetVerb() == "update" && action.GetResource().Resource == "nodes" {
|
||||
node := action.(core.UpdateAction).GetObject().(*v1.Node)
|
||||
_, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeNetworkUnavailable)
|
||||
if condition == nil {
|
||||
t.Errorf("%d. Missing NodeNetworkUnavailable condition for Node %v", i, node.Name)
|
||||
} else {
|
||||
check := func(index int) bool {
|
||||
return (condition.Status == v1.ConditionFalse) == testCase.expectedNetworkUnavailable[index]
|
||||
}
|
||||
index := -1
|
||||
for j := range testCase.nodes {
|
||||
if testCase.nodes[j].Name == node.Name {
|
||||
index = j
|
||||
}
|
||||
}
|
||||
if index == -1 {
|
||||
// Something's wrong
|
||||
continue
|
||||
}
|
||||
if !check(index) {
|
||||
t.Errorf("%d. Invalid NodeNetworkUnavailable condition for Node %v, expected %v, got %v",
|
||||
i, node.Name, testCase.expectedNetworkUnavailable[index], (condition.Status == v1.ConditionFalse))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
var finalRoutes []*cloudprovider.Route
|
||||
var err error
|
||||
timeoutChan := time.After(200 * time.Millisecond)
|
||||
tick := time.NewTicker(10 * time.Millisecond)
|
||||
defer tick.Stop()
|
||||
poll:
|
||||
for {
|
||||
select {
|
||||
case <-tick.C:
|
||||
if finalRoutes, err = routes.ListRoutes(context.TODO(), cluster); err == nil && routeListEqual(finalRoutes, testCase.expectedRoutes) {
|
||||
break poll
|
||||
}
|
||||
case <-timeoutChan:
|
||||
t.Errorf("%d. rc.reconcile() = %v, routes:\n%v\nexpected: nil, routes:\n%v\n", i, err, flatten(finalRoutes), flatten(testCase.expectedRoutes))
|
||||
break poll
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func routeListEqual(list1, list2 []*cloudprovider.Route) bool {
|
||||
if len(list1) != len(list2) {
|
||||
return false
|
||||
}
|
||||
routeMap1 := make(map[string]*cloudprovider.Route)
|
||||
for _, route1 := range list1 {
|
||||
routeMap1[route1.Name] = route1
|
||||
}
|
||||
for _, route2 := range list2 {
|
||||
if route1, exists := routeMap1[route2.Name]; !exists || *route1 != *route2 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func flatten(list []*cloudprovider.Route) []cloudprovider.Route {
|
||||
var structList []cloudprovider.Route
|
||||
for _, route := range list {
|
||||
structList = append(structList, *route)
|
||||
}
|
||||
return structList
|
||||
}
|
Loading…
Reference in New Issue