From 9187b343e17db2bf8f3470cd6e1ef7f661814c15 Mon Sep 17 00:00:00 2001 From: Walter Fender Date: Wed, 11 Oct 2017 16:36:39 -0700 Subject: [PATCH] Split the NodeController into lifecycle and ipam pieces. Prepatory work fpr removing cloud provider dependency from node controller running in Kube Controller Manager. Splitting the node controller into its two major pieces life-cycle and CIDR/IP management. Both pieces currently need the the cloud system to do their work. Removing lifecycles dependency on cloud will be fixed ina followup PR. Moved node scheduler code to live with node lifecycle controller. Got the IPAM/Lifecycle split completed. Still need to rename pieces. Made changes to the utils and tests so they would be in the appropriate package. Moved the node based ipam code to nodeipam. Made the relevant tests pass. Moved common node controller util code to nodeutil. Removed unneeded pod informer sync from node ipam controller. Fixed linter issues. Factored in feedback from @gmarek. Factored in feedback from @mtaufen. Undoing unneeded change. --- cmd/kube-controller-manager/app/BUILD | 5 +- .../app/controllermanager.go | 6 +- cmd/kube-controller-manager/app/core.go | 44 +- pkg/controller/BUILD | 4 +- pkg/controller/nodeipam/BUILD | 48 ++ pkg/controller/{node => nodeipam}/OWNERS | 0 pkg/controller/{node => nodeipam}/doc.go | 4 +- pkg/controller/{node => nodeipam}/ipam/BUILD | 20 +- pkg/controller/{node => nodeipam}/ipam/OWNERS | 0 .../{node => nodeipam}/ipam/adapter.go | 0 .../{node => nodeipam}/ipam/cidr_allocator.go | 0 .../{node => nodeipam}/ipam/cidrset/BUILD | 4 +- .../ipam/cidrset/cidr_set.go | 0 .../ipam/cidrset/cidr_set_test.go | 0 .../ipam/cloud_cidr_allocator.go | 14 +- .../{node => nodeipam}/ipam/controller.go | 12 +- .../ipam/controller_test.go | 4 +- pkg/controller/{node => nodeipam}/ipam/doc.go | 0 .../ipam/range_allocator.go | 18 +- .../ipam/range_allocator_test.go | 0 .../{node => nodeipam}/ipam/sync/BUILD | 10 +- .../{node => nodeipam}/ipam/sync/sync.go | 2 +- .../{node => nodeipam}/ipam/sync/sync_test.go | 4 +- .../{node => nodeipam}/ipam/test/BUILD | 2 +- .../{node => nodeipam}/ipam/test/utils.go | 0 .../{node => nodeipam}/ipam/timeout.go | 0 .../{node => nodeipam}/ipam/timeout_test.go | 0 pkg/controller/nodeipam/metrics.go | 21 + .../nodeipam/node_ipam_controller.go | 187 +++++ pkg/controller/{node => nodelifecycle}/BUILD | 134 ++-- .../{node => nodelifecycle}/metrics.go | 4 +- .../node_lifecycle_controller.go} | 753 ++++++++---------- .../node_lifecycle_controller_test.go} | 285 ++++--- .../{node => nodelifecycle}/scheduler/BUILD | 54 +- .../scheduler/rate_limited_queue.go | 0 .../scheduler/rate_limited_queue_test.go | 0 .../scheduler/taint_manager.go} | 5 +- .../scheduler/taint_manager_test.go} | 0 .../scheduler/timed_workers.go | 0 .../scheduler/timed_workers_test.go | 0 pkg/controller/{node/util => util/node}/BUILD | 11 +- .../util => util/node}/controller_utils.go | 12 +- test/e2e/apps/BUILD | 2 +- test/e2e/apps/network_partition.go | 2 +- test/e2e/framework/BUILD | 2 +- test/e2e/framework/util.go | 2 +- test/integration/garbagecollector/BUILD | 8 +- test/integration/scheduler/BUILD | 3 +- test/integration/scheduler/taint_test.go | 24 +- 49 files changed, 972 insertions(+), 738 deletions(-) create mode 100644 pkg/controller/nodeipam/BUILD rename pkg/controller/{node => nodeipam}/OWNERS (100%) rename pkg/controller/{node => nodeipam}/doc.go (80%) rename pkg/controller/{node => nodeipam}/ipam/BUILD (82%) rename pkg/controller/{node => nodeipam}/ipam/OWNERS (100%) rename pkg/controller/{node => nodeipam}/ipam/adapter.go (100%) rename pkg/controller/{node => nodeipam}/ipam/cidr_allocator.go (100%) rename pkg/controller/{node => nodeipam}/ipam/cidrset/BUILD (80%) rename pkg/controller/{node => nodeipam}/ipam/cidrset/cidr_set.go (100%) rename pkg/controller/{node => nodeipam}/ipam/cidrset/cidr_set_test.go (100%) rename pkg/controller/{node => nodeipam}/ipam/cloud_cidr_allocator.go (94%) rename pkg/controller/{node => nodeipam}/ipam/controller.go (93%) rename pkg/controller/{node => nodeipam}/ipam/controller_test.go (94%) rename pkg/controller/{node => nodeipam}/ipam/doc.go (100%) rename pkg/controller/{node => nodeipam}/ipam/range_allocator.go (94%) rename pkg/controller/{node => nodeipam}/ipam/range_allocator_test.go (100%) rename pkg/controller/{node => nodeipam}/ipam/sync/BUILD (72%) rename pkg/controller/{node => nodeipam}/ipam/sync/sync.go (99%) rename pkg/controller/{node => nodeipam}/ipam/sync/sync_test.go (98%) rename pkg/controller/{node => nodeipam}/ipam/test/BUILD (85%) rename pkg/controller/{node => nodeipam}/ipam/test/utils.go (100%) rename pkg/controller/{node => nodeipam}/ipam/timeout.go (100%) rename pkg/controller/{node => nodeipam}/ipam/timeout_test.go (100%) create mode 100644 pkg/controller/nodeipam/metrics.go create mode 100644 pkg/controller/nodeipam/node_ipam_controller.go rename pkg/controller/{node => nodelifecycle}/BUILD (78%) rename pkg/controller/{node => nodelifecycle}/metrics.go (97%) rename pkg/controller/{node/node_controller.go => nodelifecycle/node_lifecycle_controller.go} (84%) rename pkg/controller/{node/nodecontroller_test.go => nodelifecycle/node_lifecycle_controller_test.go} (93%) rename pkg/controller/{node => nodelifecycle}/scheduler/BUILD (85%) rename pkg/controller/{node => nodelifecycle}/scheduler/rate_limited_queue.go (100%) rename pkg/controller/{node => nodelifecycle}/scheduler/rate_limited_queue_test.go (100%) rename pkg/controller/{node/scheduler/taint_controller.go => nodelifecycle/scheduler/taint_manager.go} (99%) rename pkg/controller/{node/scheduler/taint_controller_test.go => nodelifecycle/scheduler/taint_manager_test.go} (100%) rename pkg/controller/{node => nodelifecycle}/scheduler/timed_workers.go (100%) rename pkg/controller/{node => nodelifecycle}/scheduler/timed_workers_test.go (100%) rename pkg/controller/{node/util => util/node}/BUILD (87%) rename pkg/controller/{node/util => util/node}/controller_utils.go (96%) diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 7bee9288b4..d6940c30d3 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -58,8 +58,9 @@ go_library( "//pkg/controller/garbagecollector:go_default_library", "//pkg/controller/job:go_default_library", "//pkg/controller/namespace:go_default_library", - "//pkg/controller/node:go_default_library", - "//pkg/controller/node/ipam:go_default_library", + "//pkg/controller/nodeipam:go_default_library", + "//pkg/controller/nodeipam/ipam:go_default_library", + "//pkg/controller/nodelifecycle:go_default_library", "//pkg/controller/podautoscaler:go_default_library", "//pkg/controller/podautoscaler/metrics:go_default_library", "//pkg/controller/podgc:go_default_library", diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 1f7832ed9d..85dc51de5c 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -368,10 +368,12 @@ func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc controllers["tokencleaner"] = startTokenCleanerController if loopMode == IncludeCloudLoops { controllers["service"] = startServiceController + controllers["nodeipam"] = startNodeIpamController controllers["route"] = startRouteController - // TODO: Move node controller and volume controller into the IncludeCloudLoops only set. + // TODO: volume controller into the IncludeCloudLoops only set. + // TODO: Separate cluster in cloud check from node lifecycle controller. } - controllers["node"] = startNodeController + controllers["nodelifecycle"] = startNodeLifecycleController controllers["persistentvolume-binder"] = startPersistentVolumeBinderController controllers["attachdetach"] = startAttachDetachController controllers["persistentvolume-expander"] = startVolumeExpandController diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 819d18e0f0..2fceb2370b 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -41,8 +41,9 @@ import ( endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/pkg/controller/garbagecollector" namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" - nodecontroller "k8s.io/kubernetes/pkg/controller/node" - "k8s.io/kubernetes/pkg/controller/node/ipam" + nodeipamcontroller "k8s.io/kubernetes/pkg/controller/nodeipam" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam" + lifecyclecontroller "k8s.io/kubernetes/pkg/controller/nodelifecycle" "k8s.io/kubernetes/pkg/controller/podgc" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" @@ -77,7 +78,7 @@ func startServiceController(ctx ControllerContext) (bool, error) { return true, nil } -func startNodeController(ctx ControllerContext) (bool, error) { +func startNodeIpamController(ctx ControllerContext) (bool, error) { var clusterCIDR *net.IPNet = nil var serviceCIDR *net.IPNet = nil if ctx.Options.AllocateNodeCIDRs { @@ -97,25 +98,38 @@ func startNodeController(ctx ControllerContext) (bool, error) { } } - nodeController, err := nodecontroller.NewNodeController( - ctx.InformerFactory.Core().V1().Pods(), + nodeIpamController, err := nodeipamcontroller.NewNodeIpamController( ctx.InformerFactory.Core().V1().Nodes(), - ctx.InformerFactory.Extensions().V1beta1().DaemonSets(), ctx.Cloud, ctx.ClientBuilder.ClientOrDie("node-controller"), - ctx.Options.PodEvictionTimeout.Duration, - ctx.Options.NodeEvictionRate, - ctx.Options.SecondaryNodeEvictionRate, - ctx.Options.LargeClusterSizeThreshold, - ctx.Options.UnhealthyZoneThreshold, - ctx.Options.NodeMonitorGracePeriod.Duration, - ctx.Options.NodeStartupGracePeriod.Duration, - ctx.Options.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(ctx.Options.NodeCIDRMaskSize), ctx.Options.AllocateNodeCIDRs, ipam.CIDRAllocatorType(ctx.Options.CIDRAllocatorType), + ) + if err != nil { + return true, err + } + go nodeIpamController.Run(ctx.Stop) + return true, nil +} + +func startNodeLifecycleController(ctx ControllerContext) (bool, error) { + lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController( + ctx.InformerFactory.Core().V1().Pods(), + ctx.InformerFactory.Core().V1().Nodes(), + ctx.InformerFactory.Extensions().V1beta1().DaemonSets(), + ctx.Cloud, + ctx.ClientBuilder.ClientOrDie("node-controller"), + ctx.Options.NodeMonitorPeriod.Duration, + ctx.Options.NodeStartupGracePeriod.Duration, + ctx.Options.NodeMonitorGracePeriod.Duration, + ctx.Options.PodEvictionTimeout.Duration, + ctx.Options.NodeEvictionRate, + ctx.Options.SecondaryNodeEvictionRate, + ctx.Options.LargeClusterSizeThreshold, + ctx.Options.UnhealthyZoneThreshold, ctx.Options.EnableTaintManager, utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions), utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition), @@ -123,7 +137,7 @@ func startNodeController(ctx ControllerContext) (bool, error) { if err != nil { return true, err } - go nodeController.Run(ctx.Stop) + go lifecycleController.Run(ctx.Stop) return true, nil } diff --git a/pkg/controller/BUILD b/pkg/controller/BUILD index f1e808f2ee..7c2db32b5d 100644 --- a/pkg/controller/BUILD +++ b/pkg/controller/BUILD @@ -117,7 +117,8 @@ filegroup( "//pkg/controller/history:all-srcs", "//pkg/controller/job:all-srcs", "//pkg/controller/namespace:all-srcs", - "//pkg/controller/node:all-srcs", + "//pkg/controller/nodeipam:all-srcs", + "//pkg/controller/nodelifecycle:all-srcs", "//pkg/controller/podautoscaler:all-srcs", "//pkg/controller/podgc:all-srcs", "//pkg/controller/replicaset:all-srcs", @@ -129,6 +130,7 @@ filegroup( "//pkg/controller/statefulset:all-srcs", "//pkg/controller/testutil:all-srcs", "//pkg/controller/ttl:all-srcs", + "//pkg/controller/util/node:all-srcs", "//pkg/controller/volume/attachdetach:all-srcs", "//pkg/controller/volume/events:all-srcs", "//pkg/controller/volume/expand:all-srcs", diff --git a/pkg/controller/nodeipam/BUILD b/pkg/controller/nodeipam/BUILD new file mode 100644 index 0000000000..46a62bb93b --- /dev/null +++ b/pkg/controller/nodeipam/BUILD @@ -0,0 +1,48 @@ +package(default_visibility = ["//visibility:public"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "metrics.go", + "node_ipam_controller.go", + ], + importpath = "k8s.io/kubernetes/pkg/controller/nodeipam", + deps = [ + "//pkg/cloudprovider:go_default_library", + "//pkg/controller:go_default_library", + "//pkg/controller/nodeipam/ipam:go_default_library", + "//pkg/controller/nodeipam/ipam/sync:go_default_library", + "//pkg/util/metrics:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/controller/nodeipam/ipam:all-srcs", + ], + tags = ["automanaged"], +) diff --git a/pkg/controller/node/OWNERS b/pkg/controller/nodeipam/OWNERS similarity index 100% rename from pkg/controller/node/OWNERS rename to pkg/controller/nodeipam/OWNERS diff --git a/pkg/controller/node/doc.go b/pkg/controller/nodeipam/doc.go similarity index 80% rename from pkg/controller/node/doc.go rename to pkg/controller/nodeipam/doc.go index b649f1dda4..a7b2d12db8 100644 --- a/pkg/controller/node/doc.go +++ b/pkg/controller/nodeipam/doc.go @@ -14,6 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package node contains code for syncing cloud instances with +// Package nodeipam contains code for syncing cloud instances with // node registry -package node // import "k8s.io/kubernetes/pkg/controller/node" +package nodeipam // import "k8s.io/kubernetes/pkg/controller/nodeipam" diff --git a/pkg/controller/node/ipam/BUILD b/pkg/controller/nodeipam/ipam/BUILD similarity index 82% rename from pkg/controller/node/ipam/BUILD rename to pkg/controller/nodeipam/ipam/BUILD index 667f29f6b0..5a1e1018a2 100644 --- a/pkg/controller/node/ipam/BUILD +++ b/pkg/controller/nodeipam/ipam/BUILD @@ -14,11 +14,11 @@ go_test( "timeout_test.go", ], embed = [":go_default_library"], - importpath = "k8s.io/kubernetes/pkg/controller/node/ipam", + importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam", deps = [ "//pkg/controller:go_default_library", - "//pkg/controller/node/ipam/cidrset:go_default_library", - "//pkg/controller/node/ipam/test:go_default_library", + "//pkg/controller/nodeipam/ipam/cidrset:go_default_library", + "//pkg/controller/nodeipam/ipam/test:go_default_library", "//pkg/controller/testutil:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -40,15 +40,15 @@ go_library( "range_allocator.go", "timeout.go", ], - importpath = "k8s.io/kubernetes/pkg/controller/node/ipam", + importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam", deps = [ "//pkg/api/v1/node:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider/providers/gce:go_default_library", "//pkg/controller:go_default_library", - "//pkg/controller/node/ipam/cidrset:go_default_library", - "//pkg/controller/node/ipam/sync:go_default_library", - "//pkg/controller/node/util:go_default_library", + "//pkg/controller/nodeipam/ipam/cidrset:go_default_library", + "//pkg/controller/nodeipam/ipam/sync:go_default_library", + "//pkg/controller/util/node:go_default_library", "//pkg/util/node:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", @@ -82,9 +82,9 @@ filegroup( name = "all-srcs", srcs = [ ":package-srcs", - "//pkg/controller/node/ipam/cidrset:all-srcs", - "//pkg/controller/node/ipam/sync:all-srcs", - "//pkg/controller/node/ipam/test:all-srcs", + "//pkg/controller/nodeipam/ipam/cidrset:all-srcs", + "//pkg/controller/nodeipam/ipam/sync:all-srcs", + "//pkg/controller/nodeipam/ipam/test:all-srcs", ], tags = ["automanaged"], ) diff --git a/pkg/controller/node/ipam/OWNERS b/pkg/controller/nodeipam/ipam/OWNERS similarity index 100% rename from pkg/controller/node/ipam/OWNERS rename to pkg/controller/nodeipam/ipam/OWNERS diff --git a/pkg/controller/node/ipam/adapter.go b/pkg/controller/nodeipam/ipam/adapter.go similarity index 100% rename from pkg/controller/node/ipam/adapter.go rename to pkg/controller/nodeipam/ipam/adapter.go diff --git a/pkg/controller/node/ipam/cidr_allocator.go b/pkg/controller/nodeipam/ipam/cidr_allocator.go similarity index 100% rename from pkg/controller/node/ipam/cidr_allocator.go rename to pkg/controller/nodeipam/ipam/cidr_allocator.go diff --git a/pkg/controller/node/ipam/cidrset/BUILD b/pkg/controller/nodeipam/ipam/cidrset/BUILD similarity index 80% rename from pkg/controller/node/ipam/cidrset/BUILD rename to pkg/controller/nodeipam/ipam/cidrset/BUILD index e3accb73bc..c1bbda1c69 100644 --- a/pkg/controller/node/ipam/cidrset/BUILD +++ b/pkg/controller/nodeipam/ipam/cidrset/BUILD @@ -10,14 +10,14 @@ go_test( name = "go_default_test", srcs = ["cidr_set_test.go"], embed = [":go_default_library"], - importpath = "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset", + importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset", deps = ["//vendor/github.com/golang/glog:go_default_library"], ) go_library( name = "go_default_library", srcs = ["cidr_set.go"], - importpath = "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset", + importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset", ) filegroup( diff --git a/pkg/controller/node/ipam/cidrset/cidr_set.go b/pkg/controller/nodeipam/ipam/cidrset/cidr_set.go similarity index 100% rename from pkg/controller/node/ipam/cidrset/cidr_set.go rename to pkg/controller/nodeipam/ipam/cidrset/cidr_set.go diff --git a/pkg/controller/node/ipam/cidrset/cidr_set_test.go b/pkg/controller/nodeipam/ipam/cidrset/cidr_set_test.go similarity index 100% rename from pkg/controller/node/ipam/cidrset/cidr_set_test.go rename to pkg/controller/nodeipam/ipam/cidrset/cidr_set_test.go diff --git a/pkg/controller/node/ipam/cloud_cidr_allocator.go b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go similarity index 94% rename from pkg/controller/node/ipam/cloud_cidr_allocator.go rename to pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go index 98b65b22e6..7a07409c7c 100644 --- a/pkg/controller/node/ipam/cloud_cidr_allocator.go +++ b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go @@ -40,7 +40,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/node/util" + nodeutil "k8s.io/kubernetes/pkg/controller/util/node" utilnode "k8s.io/kubernetes/pkg/util/node" ) @@ -101,8 +101,8 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter } nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: util.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR), - UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { + AddFunc: nodeutil.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR), + UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { if newNode.Spec.PodCIDR == "" { return ca.AllocateOrOccupyCIDR(newNode) } @@ -114,7 +114,7 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter } return nil }), - DeleteFunc: util.CreateDeleteNodeHandler(ca.ReleaseCIDR), + DeleteFunc: nodeutil.CreateDeleteNodeHandler(ca.ReleaseCIDR), }) glog.V(0).Infof("Using cloud CIDR allocator (provider: %v)", cloud.ProviderName()) @@ -197,11 +197,11 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error { cidrs, err := ca.cloud.AliasRanges(types.NodeName(nodeName)) if err != nil { - util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") + nodeutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to allocate cidr: %v", err) } if len(cidrs) == 0 { - util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") + nodeutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to allocate cidr: Node %v has no CIDRs", node.Name) } _, cidr, err := net.ParseCIDR(cidrs[0]) @@ -237,7 +237,7 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error { glog.Errorf("Failed to update node %v PodCIDR to %v (%d retries left): %v", node.Name, podCIDR, cidrUpdateRetries-rep-1, err) } if err != nil { - util.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed") + nodeutil.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed") glog.Errorf("CIDR assignment for node %v failed: %v.", nodeName, err) return err } diff --git a/pkg/controller/node/ipam/controller.go b/pkg/controller/nodeipam/ipam/controller.go similarity index 93% rename from pkg/controller/node/ipam/controller.go rename to pkg/controller/nodeipam/ipam/controller.go index 4b1221b678..6ab18d69f6 100644 --- a/pkg/controller/node/ipam/controller.go +++ b/pkg/controller/nodeipam/ipam/controller.go @@ -30,9 +30,9 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" - "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset" - nodesync "k8s.io/kubernetes/pkg/controller/node/ipam/sync" - "k8s.io/kubernetes/pkg/controller/node/util" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset" + nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync" + nodeutil "k8s.io/kubernetes/pkg/controller/util/node" ) // Config for the IPAM controller. @@ -128,9 +128,9 @@ func (c *Controller) Start(nodeInformer informers.NodeInformer) error { } nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: util.CreateAddNodeHandler(c.onAdd), - UpdateFunc: util.CreateUpdateNodeHandler(c.onUpdate), - DeleteFunc: util.CreateDeleteNodeHandler(c.onDelete), + AddFunc: nodeutil.CreateAddNodeHandler(c.onAdd), + UpdateFunc: nodeutil.CreateUpdateNodeHandler(c.onUpdate), + DeleteFunc: nodeutil.CreateDeleteNodeHandler(c.onDelete), }) return nil diff --git a/pkg/controller/node/ipam/controller_test.go b/pkg/controller/nodeipam/ipam/controller_test.go similarity index 94% rename from pkg/controller/node/ipam/controller_test.go rename to pkg/controller/nodeipam/ipam/controller_test.go index 14fbb4340f..6e5a6f9957 100644 --- a/pkg/controller/node/ipam/controller_test.go +++ b/pkg/controller/nodeipam/ipam/controller_test.go @@ -20,8 +20,8 @@ import ( "net" "testing" - "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset" - "k8s.io/kubernetes/pkg/controller/node/ipam/test" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/test" ) func TestOccupyServiceCIDR(t *testing.T) { diff --git a/pkg/controller/node/ipam/doc.go b/pkg/controller/nodeipam/ipam/doc.go similarity index 100% rename from pkg/controller/node/ipam/doc.go rename to pkg/controller/nodeipam/ipam/doc.go diff --git a/pkg/controller/node/ipam/range_allocator.go b/pkg/controller/nodeipam/ipam/range_allocator.go similarity index 94% rename from pkg/controller/node/ipam/range_allocator.go rename to pkg/controller/nodeipam/ipam/range_allocator.go index d3037b1d1d..5de2195854 100644 --- a/pkg/controller/node/ipam/range_allocator.go +++ b/pkg/controller/nodeipam/ipam/range_allocator.go @@ -36,9 +36,9 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset" - "k8s.io/kubernetes/pkg/controller/node/util" - nodeutil "k8s.io/kubernetes/pkg/util/node" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset" + nodeutil "k8s.io/kubernetes/pkg/controller/util/node" + utilnode "k8s.io/kubernetes/pkg/util/node" ) type rangeAllocator struct { @@ -119,8 +119,8 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No } nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: util.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR), - UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { + AddFunc: nodeutil.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR), + UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { // If the PodCIDR is not empty we either: // - already processed a Node that already had a CIDR after NC restarted // (cidr is marked as used), @@ -145,7 +145,7 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No } return nil }), - DeleteFunc: util.CreateDeleteNodeHandler(ra.ReleaseCIDR), + DeleteFunc: nodeutil.CreateDeleteNodeHandler(ra.ReleaseCIDR), }) return ra, nil @@ -234,7 +234,7 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error { podCIDR, err := r.cidrs.AllocateNext() if err != nil { r.removeNodeFromProcessing(node.Name) - util.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") + nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to allocate cidr: %v", err) } @@ -303,14 +303,14 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error { } return nil } - if err = nodeutil.PatchNodeCIDR(r.client, types.NodeName(node.Name), podCIDR); err == nil { + if err = utilnode.PatchNodeCIDR(r.client, types.NodeName(node.Name), podCIDR); err == nil { glog.Infof("Set node %v PodCIDR to %v", node.Name, podCIDR) break } glog.Errorf("Failed to update node %v PodCIDR to %v (%d retries left): %v", node.Name, podCIDR, cidrUpdateRetries-rep-1, err) } if err != nil { - util.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed") + nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed") // We accept the fact that we may leek CIDRs here. This is safer than releasing // them in case when we don't know if request went through. // NodeController restart will return all falsely allocated CIDRs to the pool. diff --git a/pkg/controller/node/ipam/range_allocator_test.go b/pkg/controller/nodeipam/ipam/range_allocator_test.go similarity index 100% rename from pkg/controller/node/ipam/range_allocator_test.go rename to pkg/controller/nodeipam/ipam/range_allocator_test.go diff --git a/pkg/controller/node/ipam/sync/BUILD b/pkg/controller/nodeipam/ipam/sync/BUILD similarity index 72% rename from pkg/controller/node/ipam/sync/BUILD rename to pkg/controller/nodeipam/ipam/sync/BUILD index 6530b5d812..2ecba089f4 100644 --- a/pkg/controller/node/ipam/sync/BUILD +++ b/pkg/controller/nodeipam/ipam/sync/BUILD @@ -3,10 +3,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = ["sync.go"], - importpath = "k8s.io/kubernetes/pkg/controller/node/ipam/sync", + importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync", visibility = ["//visibility:public"], deps = [ - "//pkg/controller/node/ipam/cidrset:go_default_library", + "//pkg/controller/nodeipam/ipam/cidrset:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", ], @@ -16,10 +16,10 @@ go_test( name = "go_default_test", srcs = ["sync_test.go"], embed = [":go_default_library"], - importpath = "k8s.io/kubernetes/pkg/controller/node/ipam/sync", + importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync", deps = [ - "//pkg/controller/node/ipam/cidrset:go_default_library", - "//pkg/controller/node/ipam/test:go_default_library", + "//pkg/controller/nodeipam/ipam/cidrset:go_default_library", + "//pkg/controller/nodeipam/ipam/test:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/controller/node/ipam/sync/sync.go b/pkg/controller/nodeipam/ipam/sync/sync.go similarity index 99% rename from pkg/controller/node/ipam/sync/sync.go rename to pkg/controller/nodeipam/ipam/sync/sync.go index 4995f42554..fabc3a1126 100644 --- a/pkg/controller/node/ipam/sync/sync.go +++ b/pkg/controller/nodeipam/ipam/sync/sync.go @@ -25,7 +25,7 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" - "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset" ) const ( diff --git a/pkg/controller/node/ipam/sync/sync_test.go b/pkg/controller/nodeipam/ipam/sync/sync_test.go similarity index 98% rename from pkg/controller/node/ipam/sync/sync_test.go rename to pkg/controller/nodeipam/ipam/sync/sync_test.go index d326848043..4a47280d94 100644 --- a/pkg/controller/node/ipam/sync/sync_test.go +++ b/pkg/controller/nodeipam/ipam/sync/sync_test.go @@ -26,8 +26,8 @@ import ( "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset" - "k8s.io/kubernetes/pkg/controller/node/ipam/test" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/test" "k8s.io/api/core/v1" ) diff --git a/pkg/controller/node/ipam/test/BUILD b/pkg/controller/nodeipam/ipam/test/BUILD similarity index 85% rename from pkg/controller/node/ipam/test/BUILD rename to pkg/controller/nodeipam/ipam/test/BUILD index 38155ed097..0c6fd3a281 100644 --- a/pkg/controller/node/ipam/test/BUILD +++ b/pkg/controller/nodeipam/ipam/test/BUILD @@ -3,7 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", srcs = ["utils.go"], - importpath = "k8s.io/kubernetes/pkg/controller/node/ipam/test", + importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/test", visibility = ["//visibility:public"], ) diff --git a/pkg/controller/node/ipam/test/utils.go b/pkg/controller/nodeipam/ipam/test/utils.go similarity index 100% rename from pkg/controller/node/ipam/test/utils.go rename to pkg/controller/nodeipam/ipam/test/utils.go diff --git a/pkg/controller/node/ipam/timeout.go b/pkg/controller/nodeipam/ipam/timeout.go similarity index 100% rename from pkg/controller/node/ipam/timeout.go rename to pkg/controller/nodeipam/ipam/timeout.go diff --git a/pkg/controller/node/ipam/timeout_test.go b/pkg/controller/nodeipam/ipam/timeout_test.go similarity index 100% rename from pkg/controller/node/ipam/timeout_test.go rename to pkg/controller/nodeipam/ipam/timeout_test.go diff --git a/pkg/controller/nodeipam/metrics.go b/pkg/controller/nodeipam/metrics.go new file mode 100644 index 0000000000..9211ce3f38 --- /dev/null +++ b/pkg/controller/nodeipam/metrics.go @@ -0,0 +1,21 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeipam + +// Register the metrics that are to be monitored. +func Register() { +} diff --git a/pkg/controller/nodeipam/node_ipam_controller.go b/pkg/controller/nodeipam/node_ipam_controller.go new file mode 100644 index 0000000000..e2dad9e4f5 --- /dev/null +++ b/pkg/controller/nodeipam/node_ipam_controller.go @@ -0,0 +1,187 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeipam + +import ( + "net" + "time" + + "github.com/golang/glog" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + + "k8s.io/api/core/v1" + coreinformers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam" + nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync" + "k8s.io/kubernetes/pkg/util/metrics" +) + +func init() { + // Register prometheus metrics + Register() +} + +const ( + // ipamResyncInterval is the amount of time between when the cloud and node + // CIDR range assignments are synchronized. + ipamResyncInterval = 30 * time.Second + // ipamMaxBackoff is the maximum backoff for retrying synchronization of a + // given in the error state. + ipamMaxBackoff = 10 * time.Second + // ipamInitialRetry is the initial retry interval for retrying synchronization of a + // given in the error state. + ipamInitialBackoff = 250 * time.Millisecond +) + +// Controller is the controller that manages node ipam state. +type Controller struct { + allocateNodeCIDRs bool + allocatorType ipam.CIDRAllocatorType + + cloud cloudprovider.Interface + clusterCIDR *net.IPNet + serviceCIDR *net.IPNet + kubeClient clientset.Interface + // Method for easy mocking in unittest. + lookupIP func(host string) ([]net.IP, error) + + nodeLister corelisters.NodeLister + nodeInformerSynced cache.InformerSynced + + cidrAllocator ipam.CIDRAllocator + + forcefullyDeletePod func(*v1.Pod) error +} + +// NewNodeIpamController returns a new node IP Address Management controller to +// sync instances from cloudprovider. +// This method returns an error if it is unable to initialize the CIDR bitmap with +// podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes +// currently, this should be handled as a fatal error. +func NewNodeIpamController( + nodeInformer coreinformers.NodeInformer, + cloud cloudprovider.Interface, + kubeClient clientset.Interface, + clusterCIDR *net.IPNet, + serviceCIDR *net.IPNet, + nodeCIDRMaskSize int, + allocateNodeCIDRs bool, + allocatorType ipam.CIDRAllocatorType) (*Controller, error) { + + if kubeClient == nil { + glog.Fatalf("kubeClient is nil when starting Controller") + } + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + + glog.V(0).Infof("Sending events to api server.") + eventBroadcaster.StartRecordingToSink( + &v1core.EventSinkImpl{ + Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events(""), + }) + + if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("node_ipam_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) + } + + if allocateNodeCIDRs { + if clusterCIDR == nil { + glog.Fatal("Controller: Must specify clusterCIDR if allocateNodeCIDRs == true.") + } + mask := clusterCIDR.Mask + if maskSize, _ := mask.Size(); maskSize > nodeCIDRMaskSize { + glog.Fatal("Controller: Invalid clusterCIDR, mask size of clusterCIDR must be less than nodeCIDRMaskSize.") + } + } + + ic := &Controller{ + cloud: cloud, + kubeClient: kubeClient, + lookupIP: net.LookupIP, + clusterCIDR: clusterCIDR, + serviceCIDR: serviceCIDR, + allocateNodeCIDRs: allocateNodeCIDRs, + allocatorType: allocatorType, + } + + // TODO: Abstract this check into a generic controller manager should run method. + if ic.allocateNodeCIDRs { + if ic.allocatorType == ipam.IPAMFromClusterAllocatorType || ic.allocatorType == ipam.IPAMFromCloudAllocatorType { + cfg := &ipam.Config{ + Resync: ipamResyncInterval, + MaxBackoff: ipamMaxBackoff, + InitialRetry: ipamInitialBackoff, + } + switch ic.allocatorType { + case ipam.IPAMFromClusterAllocatorType: + cfg.Mode = nodesync.SyncFromCluster + case ipam.IPAMFromCloudAllocatorType: + cfg.Mode = nodesync.SyncFromCloud + } + ipamc, err := ipam.NewController(cfg, kubeClient, cloud, clusterCIDR, serviceCIDR, nodeCIDRMaskSize) + if err != nil { + glog.Fatalf("Error creating ipam controller: %v", err) + } + if err := ipamc.Start(nodeInformer); err != nil { + glog.Fatalf("Error trying to Init(): %v", err) + } + } else { + var err error + ic.cidrAllocator, err = ipam.New( + kubeClient, cloud, nodeInformer, ic.allocatorType, ic.clusterCIDR, ic.serviceCIDR, nodeCIDRMaskSize) + if err != nil { + return nil, err + } + } + } + + ic.nodeLister = nodeInformer.Lister() + ic.nodeInformerSynced = nodeInformer.Informer().HasSynced + + return ic, nil +} + +// Run starts an asynchronous loop that monitors the status of cluster nodes. +func (nc *Controller) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + glog.Infof("Starting ipam controller") + defer glog.Infof("Shutting down ipam controller") + + if !controller.WaitForCacheSync("node", stopCh, nc.nodeInformerSynced) { + return + } + + // TODO: Abstract this check into a generic controller manager should run method. + if nc.allocateNodeCIDRs { + if nc.allocatorType != ipam.IPAMFromClusterAllocatorType && nc.allocatorType != ipam.IPAMFromCloudAllocatorType { + go nc.cidrAllocator.Run(stopCh) + } + } + + <-stopCh +} diff --git a/pkg/controller/node/BUILD b/pkg/controller/nodelifecycle/BUILD similarity index 78% rename from pkg/controller/node/BUILD rename to pkg/controller/nodelifecycle/BUILD index 57a684c8fe..25d036e3cf 100644 --- a/pkg/controller/node/BUILD +++ b/pkg/controller/nodelifecycle/BUILD @@ -1,24 +1,76 @@ -package(default_visibility = ["//visibility:public"]) +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", +go_library( + name = "go_default_library", + srcs = [ + "metrics.go", + "node_lifecycle_controller.go", + ], + importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle", + visibility = ["//visibility:public"], + deps = [ + "//pkg/api/v1/node:go_default_library", + "//pkg/cloudprovider:go_default_library", + "//pkg/controller:go_default_library", + "//pkg/controller/nodelifecycle/scheduler:go_default_library", + "//pkg/controller/util/node:go_default_library", + "//pkg/util/metrics:go_default_library", + "//pkg/util/node:go_default_library", + "//pkg/util/system:go_default_library", + "//pkg/util/taints:go_default_library", + "//pkg/util/version:go_default_library", + "//plugin/pkg/scheduler/algorithm:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/informers/extensions/v1beta1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", + "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/listers/extensions/v1beta1:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/controller/nodelifecycle/scheduler:all-srcs", + ], + tags = ["automanaged"], + visibility = ["//visibility:public"], ) go_test( name = "go_default_test", - srcs = ["nodecontroller_test.go"], + srcs = ["node_lifecycle_controller_test.go"], embed = [":go_default_library"], - importpath = "k8s.io/kubernetes/pkg/controller/node", + importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle", deps = [ "//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider/providers/fake:go_default_library", "//pkg/controller:go_default_library", - "//pkg/controller/node/ipam:go_default_library", - "//pkg/controller/node/scheduler:go_default_library", - "//pkg/controller/node/util:go_default_library", + "//pkg/controller/nodelifecycle/scheduler:go_default_library", "//pkg/controller/testutil:go_default_library", + "//pkg/controller/util/node:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/util/node:go_default_library", "//pkg/util/taints:go_default_library", @@ -39,65 +91,3 @@ go_test( "//vendor/k8s.io/client-go/testing:go_default_library", ], ) - -go_library( - name = "go_default_library", - srcs = [ - "doc.go", - "metrics.go", - "node_controller.go", - ], - importpath = "k8s.io/kubernetes/pkg/controller/node", - deps = [ - "//pkg/api/v1/node:go_default_library", - "//pkg/cloudprovider:go_default_library", - "//pkg/controller:go_default_library", - "//pkg/controller/node/ipam:go_default_library", - "//pkg/controller/node/ipam/sync:go_default_library", - "//pkg/controller/node/scheduler:go_default_library", - "//pkg/controller/node/util:go_default_library", - "//pkg/util/metrics:go_default_library", - "//pkg/util/node:go_default_library", - "//pkg/util/system:go_default_library", - "//pkg/util/taints:go_default_library", - "//plugin/pkg/scheduler/algorithm:go_default_library", - "//vendor/github.com/golang/glog:go_default_library", - "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", - "//vendor/k8s.io/client-go/informers/extensions/v1beta1:go_default_library", - "//vendor/k8s.io/client-go/kubernetes:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", - "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", - "//vendor/k8s.io/client-go/listers/extensions/v1beta1:go_default_library", - "//vendor/k8s.io/client-go/tools/cache:go_default_library", - "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [ - ":package-srcs", - "//pkg/controller/node/ipam:all-srcs", - "//pkg/controller/node/scheduler:all-srcs", - "//pkg/controller/node/util:all-srcs", - ], - tags = ["automanaged"], -) diff --git a/pkg/controller/node/metrics.go b/pkg/controller/nodelifecycle/metrics.go similarity index 97% rename from pkg/controller/node/metrics.go rename to pkg/controller/nodelifecycle/metrics.go index 31bba5b233..ae61266c8a 100644 --- a/pkg/controller/node/metrics.go +++ b/pkg/controller/nodelifecycle/metrics.go @@ -1,5 +1,5 @@ /* -Copyright 2016 The Kubernetes Authors. +Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package nodelifecycle import ( "sync" diff --git a/pkg/controller/node/node_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go similarity index 84% rename from pkg/controller/node/node_controller.go rename to pkg/controller/nodelifecycle/node_lifecycle_controller.go index ab490d6a05..e2a47f2076 100644 --- a/pkg/controller/node/node_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,16 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +// The Controller sets tainted annotations on nodes. +// Tainted nodes should not be used for new work loads and +// some effort should be given to getting existing work +// loads off of tainted nodes. + +package nodelifecycle import ( - "fmt" - "net" - "sync" - "time" - - "github.com/golang/glog" - + "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,31 +30,31 @@ import ( "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - - "k8s.io/client-go/kubernetes/scheme" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/flowcontrol" - - "k8s.io/api/core/v1" coreinformers "k8s.io/client-go/informers/core/v1" extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" corelisters "k8s.io/client-go/listers/core/v1" extensionslisters "k8s.io/client-go/listers/extensions/v1beta1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/flowcontrol" v1node "k8s.io/kubernetes/pkg/api/v1/node" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/node/ipam" - nodesync "k8s.io/kubernetes/pkg/controller/node/ipam/sync" - "k8s.io/kubernetes/pkg/controller/node/scheduler" - "k8s.io/kubernetes/pkg/controller/node/util" + "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler" + nodeutil "k8s.io/kubernetes/pkg/controller/util/node" "k8s.io/kubernetes/pkg/util/metrics" utilnode "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/system" taintutils "k8s.io/kubernetes/pkg/util/taints" + utilversion "k8s.io/kubernetes/pkg/util/version" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + + "fmt" + "github.com/golang/glog" + "sync" + "time" ) func init() { @@ -64,11 +63,14 @@ func init() { } var ( + gracefulDeletionVersion = utilversion.MustParseSemantic("v1.1.0") + // UnreachableTaintTemplate is the taint for when a node becomes unreachable. UnreachableTaintTemplate = &v1.Taint{ Key: algorithm.TaintNodeUnreachable, Effect: v1.TaintEffectNoExecute, } + // NotReadyTaintTemplate is the taint for when a node is not ready for // executing pods NotReadyTaintTemplate = &v1.Taint{ @@ -91,23 +93,6 @@ var ( } ) -const ( - // The amount of time the nodecontroller polls on the list nodes endpoint. - apiserverStartupGracePeriod = 10 * time.Minute - // The amount of time the nodecontroller should sleep between retrying NodeStatus updates - retrySleepTime = 20 * time.Millisecond - - // ipamResyncInterval is the amount of time between when the cloud and node - // CIDR range assignments are synchronized. - ipamResyncInterval = 30 * time.Second - // ipamMaxBackoff is the maximum backoff for retrying synchronization of a - // given in the error state. - ipamMaxBackoff = 10 * time.Second - // ipamInitialRetry is the initial retry interval for retrying synchronization of a - // given in the error state. - ipamInitialBackoff = 250 * time.Millisecond -) - // ZoneState is the state of a given zone. type ZoneState string @@ -118,24 +103,68 @@ const ( statePartialDisruption = ZoneState("PartialDisruption") ) +const ( + // The amount of time the nodecontroller polls on the list nodes endpoint. + apiserverStartupGracePeriod = 10 * time.Minute + // The amount of time the nodecontroller should sleep between retrying NodeStatus updates + retrySleepTime = 20 * time.Millisecond +) + type nodeStatusData struct { probeTimestamp metav1.Time readyTransitionTimestamp metav1.Time status v1.NodeStatus } -// Controller is the controller that manages node related cluster state. +// Controller is the controller that manages node's life cycle. type Controller struct { - allocateNodeCIDRs bool - allocatorType ipam.CIDRAllocatorType + taintManager *scheduler.NoExecuteTaintManager + + podInformerSynced cache.InformerSynced + cloud cloudprovider.Interface + kubeClient clientset.Interface + + // This timestamp is to be used instead of LastProbeTime stored in Condition. We do this + // to aviod the problem with time skew across the cluster. + now func() metav1.Time + + enterPartialDisruptionFunc func(nodeNum int) float32 + enterFullDisruptionFunc func(nodeNum int) float32 + computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, ZoneState) - cloud cloudprovider.Interface - clusterCIDR *net.IPNet - serviceCIDR *net.IPNet knownNodeSet map[string]*v1.Node - kubeClient clientset.Interface - // Method for easy mocking in unittest. - lookupIP func(host string) ([]net.IP, error) + // per Node map storing last observed Status together with a local time when it was observed. + nodeStatusMap map[string]nodeStatusData + + // Lock to access evictor workers + evictorLock sync.Mutex + + // workers that evicts pods from unresponsive nodes. + zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue + + // workers that are responsible for tainting nodes. + zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue + + zoneStates map[string]ZoneState + + daemonSetStore extensionslisters.DaemonSetLister + daemonSetInformerSynced cache.InformerSynced + + nodeLister corelisters.NodeLister + nodeInformerSynced cache.InformerSynced + nodeExistsInCloudProvider func(types.NodeName) (bool, error) + + recorder record.EventRecorder + + // Value controlling Controller monitoring period, i.e. how often does Controller + // check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod. + // TODO: Change node status monitor to watch based. + nodeMonitorPeriod time.Duration + + // Value used if sync_nodes_status=False, only for node startup. When node + // is just created, e.g. cluster bootstrap or node creation, we give a longer grace period. + nodeStartupGracePeriod time.Duration + // Value used if sync_nodes_status=False. Controller will not proactively // sync node status in this case, but will monitor node status updated from kubelet. If // it doesn't receive update for this amount of time, it will start posting "NodeReady== @@ -151,45 +180,8 @@ type Controller struct { // 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes // longer for user to see up-to-date node status. nodeMonitorGracePeriod time.Duration - // Value controlling Controller monitoring period, i.e. how often does Controller - // check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod. - // TODO: Change node status monitor to watch based. - nodeMonitorPeriod time.Duration - // Value used if sync_nodes_status=False, only for node startup. When node - // is just created, e.g. cluster bootstrap or node creation, we give a longer grace period. - nodeStartupGracePeriod time.Duration - // per Node map storing last observed Status together with a local time when it was observed. - nodeStatusMap map[string]nodeStatusData - // This timestamp is to be used instead of LastProbeTime stored in Condition. We do this - // to aviod the problem with time skew across the cluster. - now func() metav1.Time - // Lock to access evictor workers - evictorLock sync.Mutex - // workers that evicts pods from unresponsive nodes. - zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue - // workers that are responsible for tainting nodes. - zoneNoExecuteTainer map[string]*scheduler.RateLimitedTimedQueue - podEvictionTimeout time.Duration - // The maximum duration before a pod evicted from a node can be forcefully terminated. - maximumGracePeriod time.Duration - recorder record.EventRecorder - nodeLister corelisters.NodeLister - nodeInformerSynced cache.InformerSynced - - daemonSetStore extensionslisters.DaemonSetLister - daemonSetInformerSynced cache.InformerSynced - - podInformerSynced cache.InformerSynced - cidrAllocator ipam.CIDRAllocator - taintManager *scheduler.NoExecuteTaintManager - - nodeExistsInCloudProvider func(types.NodeName) (bool, error) - computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, ZoneState) - enterPartialDisruptionFunc func(nodeNum int) float32 - enterFullDisruptionFunc func(nodeNum int) float32 - - zoneStates map[string]ZoneState + podEvictionTimeout time.Duration evictionLimiterQPS float32 secondaryEvictionLimiterQPS float32 largeClusterThreshold int32 @@ -208,29 +200,20 @@ type Controller struct { taintNodeByCondition bool } -// NewNodeController returns a new node controller to sync instances from cloudprovider. -// This method returns an error if it is unable to initialize the CIDR bitmap with -// podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes -// currently, this should be handled as a fatal error. -func NewNodeController( - podInformer coreinformers.PodInformer, +// NewNodeLifecycleController returns a new taint controller. +func NewNodeLifecycleController(podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, daemonSetInformer extensionsinformers.DaemonSetInformer, cloud cloudprovider.Interface, kubeClient clientset.Interface, + nodeMonitorPeriod time.Duration, + nodeStartupGracePeriod time.Duration, + nodeMonitorGracePeriod time.Duration, podEvictionTimeout time.Duration, evictionLimiterQPS float32, secondaryEvictionLimiterQPS float32, largeClusterThreshold int32, unhealthyZoneThreshold float32, - nodeMonitorGracePeriod time.Duration, - nodeStartupGracePeriod time.Duration, - nodeMonitorPeriod time.Duration, - clusterCIDR *net.IPNet, - serviceCIDR *net.IPNet, - nodeCIDRMaskSize int, - allocateNodeCIDRs bool, - allocatorType ipam.CIDRAllocatorType, runTaintManager bool, useTaintBasedEvictions bool, taintNodeByCondition bool) (*Controller, error) { @@ -241,55 +224,32 @@ func NewNodeController( eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "node-controller"}) - eventBroadcaster.StartLogging(glog.Infof) - - glog.V(0).Infof("Sending events to api server.") - eventBroadcaster.StartRecordingToSink( - &v1core.EventSinkImpl{ - Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events(""), - }) if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { - metrics.RegisterMetricAndTrackRateLimiterUsage("node_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) - } - - if allocateNodeCIDRs { - if clusterCIDR == nil { - glog.Fatal("Controller: Must specify clusterCIDR if allocateNodeCIDRs == true.") - } - mask := clusterCIDR.Mask - if maskSize, _ := mask.Size(); maskSize > nodeCIDRMaskSize { - glog.Fatalf("Controller: Invalid clusterCIDR, mask size of clusterCIDR(%d) must be less than nodeCIDRMaskSize(%d).", maskSize, nodeCIDRMaskSize) - } + metrics.RegisterMetricAndTrackRateLimiterUsage("node_lifecycle_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) } nc := &Controller{ - cloud: cloud, - knownNodeSet: make(map[string]*v1.Node), - kubeClient: kubeClient, - recorder: recorder, - podEvictionTimeout: podEvictionTimeout, - maximumGracePeriod: 5 * time.Minute, - zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue), - zoneNoExecuteTainer: make(map[string]*scheduler.RateLimitedTimedQueue), - nodeStatusMap: make(map[string]nodeStatusData), - nodeMonitorGracePeriod: nodeMonitorGracePeriod, - nodeMonitorPeriod: nodeMonitorPeriod, - nodeStartupGracePeriod: nodeStartupGracePeriod, - lookupIP: net.LookupIP, - now: metav1.Now, - clusterCIDR: clusterCIDR, - serviceCIDR: serviceCIDR, - allocateNodeCIDRs: allocateNodeCIDRs, - allocatorType: allocatorType, + cloud: cloud, + kubeClient: kubeClient, + now: metav1.Now, + knownNodeSet: make(map[string]*v1.Node), + nodeStatusMap: make(map[string]nodeStatusData), nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) { - return util.NodeExistsInCloudProvider(cloud, nodeName) + return nodeutil.ExistsInCloudProvider(cloud, nodeName) }, + recorder: recorder, + nodeMonitorPeriod: nodeMonitorPeriod, + nodeStartupGracePeriod: nodeStartupGracePeriod, + nodeMonitorGracePeriod: nodeMonitorGracePeriod, + zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue), + zoneNoExecuteTainter: make(map[string]*scheduler.RateLimitedTimedQueue), + zoneStates: make(map[string]ZoneState), + podEvictionTimeout: podEvictionTimeout, evictionLimiterQPS: evictionLimiterQPS, secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS, largeClusterThreshold: largeClusterThreshold, unhealthyZoneThreshold: unhealthyZoneThreshold, - zoneStates: make(map[string]ZoneState), runTaintManager: runTaintManager, useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager, taintNodeByCondition: taintNodeByCondition, @@ -297,6 +257,7 @@ func NewNodeController( if useTaintBasedEvictions { glog.Infof("Controller is using taint based evictions.") } + nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc nc.enterFullDisruptionFunc = nc.HealthyQPSFunc nc.computeZoneStateFunc = nc.ComputeZoneState @@ -337,48 +298,18 @@ func NewNodeController( }) nc.podInformerSynced = podInformer.Informer().HasSynced - if nc.allocateNodeCIDRs { - if nc.allocatorType == ipam.IPAMFromClusterAllocatorType || nc.allocatorType == ipam.IPAMFromCloudAllocatorType { - cfg := &ipam.Config{ - Resync: ipamResyncInterval, - MaxBackoff: ipamMaxBackoff, - InitialRetry: ipamInitialBackoff, - } - switch nc.allocatorType { - case ipam.IPAMFromClusterAllocatorType: - cfg.Mode = nodesync.SyncFromCluster - case ipam.IPAMFromCloudAllocatorType: - cfg.Mode = nodesync.SyncFromCloud - } - ipamc, err := ipam.NewController(cfg, kubeClient, cloud, clusterCIDR, serviceCIDR, nodeCIDRMaskSize) - if err != nil { - glog.Fatalf("Error creating ipam controller: %v", err) - } - if err := ipamc.Start(nodeInformer); err != nil { - glog.Fatalf("Error trying to Init(): %v", err) - } - } else { - var err error - nc.cidrAllocator, err = ipam.New( - kubeClient, cloud, nodeInformer, nc.allocatorType, nc.clusterCIDR, nc.serviceCIDR, nodeCIDRMaskSize) - if err != nil { - return nil, err - } - } - } - if nc.runTaintManager { nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient) nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error { + AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { nc.taintManager.NodeUpdated(nil, node) return nil }), - UpdateFunc: util.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error { + UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error { nc.taintManager.NodeUpdated(oldNode, newNode) return nil }), - DeleteFunc: util.CreateDeleteNodeHandler(func(node *v1.Node) error { + DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error { nc.taintManager.NodeUpdated(node, nil) return nil }), @@ -388,10 +319,10 @@ func NewNodeController( if nc.taintNodeByCondition { glog.Infof("Controller will taint node by condition.") nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error { + AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { return nc.doNoScheduleTaintingPass(node) }), - UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { + UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { return nc.doNoScheduleTaintingPass(newNode) }), }) @@ -400,10 +331,10 @@ func NewNodeController( // NOTE(resouer): nodeInformer to substitute deprecated taint key (notReady -> not-ready). // Remove this logic when we don't need this backwards compatibility nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error { + AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { return nc.doFixDeprecatedTaintKeyPass(node) }), - UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { + UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { return nc.doFixDeprecatedTaintKeyPass(newNode) }), }) @@ -417,33 +348,40 @@ func NewNodeController( return nc, nil } -func (nc *Controller) doEvictionPass() { - nc.evictorLock.Lock() - defer nc.evictorLock.Unlock() - for k := range nc.zonePodEvictor { - // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded). - nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) { - node, err := nc.nodeLister.Get(value.Value) - if apierrors.IsNotFound(err) { - glog.Warningf("Node %v no longer present in nodeLister!", value.Value) - } else if err != nil { - glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err) - } else { - zone := utilnode.GetZoneKey(node) - evictionsNumber.WithLabelValues(zone).Inc() - } - nodeUID, _ := value.UID.(string) - remaining, err := util.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore) - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) - return false, 0 - } - if remaining { - glog.Infof("Pods awaiting deletion due to Controller eviction") - } - return true, 0 - }) +// Run starts an asynchronous loop that monitors the status of cluster nodes. +func (nc *Controller) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + glog.Infof("Starting node controller") + defer glog.Infof("Shutting down node controller") + + if !controller.WaitForCacheSync("taint", stopCh, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) { + return } + + if nc.runTaintManager { + go nc.taintManager.Run(wait.NeverStop) + } + + if nc.useTaintBasedEvictions { + // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated + // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints. + go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, wait.NeverStop) + } else { + // Managing eviction of nodes: + // When we delete pods off a node, if the node was not empty at the time we then + // queue an eviction watcher. If we hit an error, retry deletion. + go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop) + } + + // Incorporate the results of node status pushed from kubelet to master. + go wait.Until(func() { + if err := nc.monitorNodeStatus(); err != nil { + glog.Errorf("Error monitoring node status: %v", err) + } + }, nc.nodeMonitorPeriod, wait.NeverStop) + + <-stopCh } // doFixDeprecatedTaintKeyPass checks and replaces deprecated taint key with proper key name if needed. @@ -478,7 +416,7 @@ func (nc *Controller) doFixDeprecatedTaintKeyPass(node *v1.Node) error { glog.Warningf("Detected deprecated taint keys: %v on node: %v, will substitute them with %v", taintsToDel, node.GetName(), taintsToAdd) - if !util.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) { + if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) { return fmt.Errorf("failed to swap taints of node %+v", node) } return nil @@ -506,7 +444,7 @@ func (nc *Controller) doNoScheduleTaintingPass(node *v1.Node) error { if len(taintsToAdd) == 0 && len(taintsToDel) == 0 { return nil } - if !util.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) { + if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) { return fmt.Errorf("failed to swap taints of node %+v", node) } return nil @@ -515,9 +453,9 @@ func (nc *Controller) doNoScheduleTaintingPass(node *v1.Node) error { func (nc *Controller) doNoExecuteTaintingPass() { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() - for k := range nc.zoneNoExecuteTainer { + for k := range nc.zoneNoExecuteTainter { // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded). - nc.zoneNoExecuteTainer[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) { + nc.zoneNoExecuteTainter[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) { node, err := nc.nodeLister.Get(value.Value) if apierrors.IsNotFound(err) { glog.Warningf("Node %v no longer present in nodeLister!", value.Value) @@ -546,70 +484,37 @@ func (nc *Controller) doNoExecuteTaintingPass() { return true, 0 } - return util.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node), 0 + return nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node), 0 }) } } -// Run starts an asynchronous loop that monitors the status of cluster nodes. -func (nc *Controller) Run(stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() - - glog.Infof("Starting node controller") - defer glog.Infof("Shutting down node controller") - - if !controller.WaitForCacheSync("node", stopCh, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) { - return - } - - // Incorporate the results of node status pushed from kubelet to master. - go wait.Until(func() { - if err := nc.monitorNodeStatus(); err != nil { - glog.Errorf("Error monitoring node status: %v", err) - } - }, nc.nodeMonitorPeriod, wait.NeverStop) - - if nc.runTaintManager { - go nc.taintManager.Run(wait.NeverStop) - } - - if nc.useTaintBasedEvictions { - // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated - // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints. - go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, wait.NeverStop) - } else { - // Managing eviction of nodes: - // When we delete pods off a node, if the node was not empty at the time we then - // queue an eviction watcher. If we hit an error, retry deletion. - go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop) - } - - if nc.allocateNodeCIDRs { - if nc.allocatorType != ipam.IPAMFromClusterAllocatorType && nc.allocatorType != ipam.IPAMFromCloudAllocatorType { - go nc.cidrAllocator.Run(wait.NeverStop) - } - } - - <-stopCh -} - -// addPodEvictorForNewZone checks if new zone appeared, and if so add new evictor. -func (nc *Controller) addPodEvictorForNewZone(node *v1.Node) { - zone := utilnode.GetZoneKey(node) - if _, found := nc.zoneStates[zone]; !found { - nc.zoneStates[zone] = stateInitial - if !nc.useTaintBasedEvictions { - nc.zonePodEvictor[zone] = - scheduler.NewRateLimitedTimedQueue( - flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst)) - } else { - nc.zoneNoExecuteTainer[zone] = - scheduler.NewRateLimitedTimedQueue( - flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst)) - } - // Init the metric for the new zone. - glog.Infof("Initializing eviction metric for zone: %v", zone) - evictionsNumber.WithLabelValues(zone).Add(0) +func (nc *Controller) doEvictionPass() { + nc.evictorLock.Lock() + defer nc.evictorLock.Unlock() + for k := range nc.zonePodEvictor { + // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded). + nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) { + node, err := nc.nodeLister.Get(value.Value) + if apierrors.IsNotFound(err) { + glog.Warningf("Node %v no longer present in nodeLister!", value.Value) + } else if err != nil { + glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err) + } else { + zone := utilnode.GetZoneKey(node) + evictionsNumber.WithLabelValues(zone).Inc() + } + nodeUID, _ := value.UID.(string) + remaining, err := nodeutil.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) + return false, 0 + } + if remaining { + glog.Infof("Pods awaiting deletion due to Controller eviction") + } + return true, 0 + }) } } @@ -631,7 +536,7 @@ func (nc *Controller) monitorNodeStatus() error { for i := range added { glog.V(1).Infof("Controller observed a new Node: %#v", added[i].Name) - util.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name)) + nodeutil.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name)) nc.knownNodeSet[added[i].Name] = added[i] nc.addPodEvictorForNewZone(added[i]) if nc.useTaintBasedEvictions { @@ -643,7 +548,7 @@ func (nc *Controller) monitorNodeStatus() error { for i := range deleted { glog.V(1).Infof("Controller observed a Node deletion: %v", deleted[i].Name) - util.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name)) + nodeutil.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name)) delete(nc.knownNodeSet, deleted[i].Name) } @@ -684,7 +589,7 @@ func (nc *Controller) monitorNodeStatus() error { // We want to update the taint straight away if Node is already tainted with the UnreachableTaint if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) { taintToAdd := *NotReadyTaintTemplate - if !util.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) { + if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) { glog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.") } } else if nc.markNodeForTainting(node) { @@ -711,7 +616,7 @@ func (nc *Controller) monitorNodeStatus() error { // We want to update the taint straight away if Node is already tainted with the UnreachableTaint if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) { taintToAdd := *UnreachableTaintTemplate - if !util.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) { + if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) { glog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.") } } else if nc.markNodeForTainting(node) { @@ -751,8 +656,8 @@ func (nc *Controller) monitorNodeStatus() error { // Report node event. if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue { - util.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady") - if err = util.MarkAllPodsNotReady(nc.kubeClient, node); err != nil { + nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady") + if err = nodeutil.MarkAllPodsNotReady(nc.kubeClient, node); err != nil { utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err)) } } @@ -767,13 +672,13 @@ func (nc *Controller) monitorNodeStatus() error { } if !exists { glog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name) - util.RecordNodeEvent(nc.recorder, node.Name, string(node.UID), v1.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name)) + nodeutil.RecordNodeEvent(nc.recorder, node.Name, string(node.UID), v1.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name)) go func(nodeName string) { defer utilruntime.HandleCrash() // Kubelet is not reporting and Cloud Provider says node // is gone. Delete it without worrying about grace // periods. - if err := util.ForcefullyDeleteNode(nc.kubeClient, nodeName); err != nil { + if err := nodeutil.ForcefullyDeleteNode(nc.kubeClient, nodeName); err != nil { glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err) } }(node.Name) @@ -786,131 +691,6 @@ func (nc *Controller) monitorNodeStatus() error { return nil } -func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) { - newZoneStates := map[string]ZoneState{} - allAreFullyDisrupted := true - for k, v := range zoneToNodeConditions { - zoneSize.WithLabelValues(k).Set(float64(len(v))) - unhealthy, newState := nc.computeZoneStateFunc(v) - zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v))) - unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy)) - if newState != stateFullDisruption { - allAreFullyDisrupted = false - } - newZoneStates[k] = newState - if _, had := nc.zoneStates[k]; !had { - glog.Errorf("Setting initial state for unseen zone: %v", k) - nc.zoneStates[k] = stateInitial - } - } - - allWasFullyDisrupted := true - for k, v := range nc.zoneStates { - if _, have := zoneToNodeConditions[k]; !have { - zoneSize.WithLabelValues(k).Set(0) - zoneHealth.WithLabelValues(k).Set(100) - unhealthyNodes.WithLabelValues(k).Set(0) - delete(nc.zoneStates, k) - continue - } - if v != stateFullDisruption { - allWasFullyDisrupted = false - break - } - } - - // At least one node was responding in previous pass or in the current pass. Semantics is as follows: - // - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use, - // - if the new state is "normal" we resume normal operation (go back to default limiter settings), - // - if new state is "fullDisruption" we restore normal eviction rate, - // - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions. - if !allAreFullyDisrupted || !allWasFullyDisrupted { - // We're switching to full disruption mode - if allAreFullyDisrupted { - glog.V(0).Info("Controller detected that all Nodes are not-Ready. Entering master disruption mode.") - for i := range nodes { - if nc.useTaintBasedEvictions { - _, err := nc.markNodeAsReachable(nodes[i]) - if err != nil { - glog.Errorf("Failed to remove taints from Node %v", nodes[i].Name) - } - } else { - nc.cancelPodEviction(nodes[i]) - } - } - // We stop all evictions. - for k := range nc.zoneStates { - if nc.useTaintBasedEvictions { - nc.zoneNoExecuteTainer[k].SwapLimiter(0) - } else { - nc.zonePodEvictor[k].SwapLimiter(0) - } - } - for k := range nc.zoneStates { - nc.zoneStates[k] = stateFullDisruption - } - // All rate limiters are updated, so we can return early here. - return - } - // We're exiting full disruption mode - if allWasFullyDisrupted { - glog.V(0).Info("Controller detected that some Nodes are Ready. Exiting master disruption mode.") - // When exiting disruption mode update probe timestamps on all Nodes. - now := nc.now() - for i := range nodes { - v := nc.nodeStatusMap[nodes[i].Name] - v.probeTimestamp = now - v.readyTransitionTimestamp = now - nc.nodeStatusMap[nodes[i].Name] = v - } - // We reset all rate limiters to settings appropriate for the given state. - for k := range nc.zoneStates { - nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k]) - nc.zoneStates[k] = newZoneStates[k] - } - return - } - // We know that there's at least one not-fully disrupted so, - // we can use default behavior for rate limiters - for k, v := range nc.zoneStates { - newState := newZoneStates[k] - if v == newState { - continue - } - glog.V(0).Infof("Controller detected that zone %v is now in state %v.", k, newState) - nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState) - nc.zoneStates[k] = newState - } - } -} - -func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) { - switch state { - case stateNormal: - if nc.useTaintBasedEvictions { - nc.zoneNoExecuteTainer[zone].SwapLimiter(nc.evictionLimiterQPS) - } else { - nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS) - } - case statePartialDisruption: - if nc.useTaintBasedEvictions { - nc.zoneNoExecuteTainer[zone].SwapLimiter( - nc.enterPartialDisruptionFunc(zoneSize)) - } else { - nc.zonePodEvictor[zone].SwapLimiter( - nc.enterPartialDisruptionFunc(zoneSize)) - } - case stateFullDisruption: - if nc.useTaintBasedEvictions { - nc.zoneNoExecuteTainer[zone].SwapLimiter( - nc.enterFullDisruptionFunc(zoneSize)) - } else { - nc.zonePodEvictor[zone].SwapLimiter( - nc.enterFullDisruptionFunc(zoneSize)) - } - } -} - // tryUpdateNodeStatus checks a given node's conditions and tries to update it. Returns grace period to // which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred. func (nc *Controller) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) { @@ -1082,6 +862,131 @@ func (nc *Controller) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.Node return gracePeriod, observedReadyCondition, currentReadyCondition, err } +func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) { + newZoneStates := map[string]ZoneState{} + allAreFullyDisrupted := true + for k, v := range zoneToNodeConditions { + zoneSize.WithLabelValues(k).Set(float64(len(v))) + unhealthy, newState := nc.computeZoneStateFunc(v) + zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v))) + unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy)) + if newState != stateFullDisruption { + allAreFullyDisrupted = false + } + newZoneStates[k] = newState + if _, had := nc.zoneStates[k]; !had { + glog.Errorf("Setting initial state for unseen zone: %v", k) + nc.zoneStates[k] = stateInitial + } + } + + allWasFullyDisrupted := true + for k, v := range nc.zoneStates { + if _, have := zoneToNodeConditions[k]; !have { + zoneSize.WithLabelValues(k).Set(0) + zoneHealth.WithLabelValues(k).Set(100) + unhealthyNodes.WithLabelValues(k).Set(0) + delete(nc.zoneStates, k) + continue + } + if v != stateFullDisruption { + allWasFullyDisrupted = false + break + } + } + + // At least one node was responding in previous pass or in the current pass. Semantics is as follows: + // - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use, + // - if the new state is "normal" we resume normal operation (go back to default limiter settings), + // - if new state is "fullDisruption" we restore normal eviction rate, + // - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions. + if !allAreFullyDisrupted || !allWasFullyDisrupted { + // We're switching to full disruption mode + if allAreFullyDisrupted { + glog.V(0).Info("Controller detected that all Nodes are not-Ready. Entering master disruption mode.") + for i := range nodes { + if nc.useTaintBasedEvictions { + _, err := nc.markNodeAsReachable(nodes[i]) + if err != nil { + glog.Errorf("Failed to remove taints from Node %v", nodes[i].Name) + } + } else { + nc.cancelPodEviction(nodes[i]) + } + } + // We stop all evictions. + for k := range nc.zoneStates { + if nc.useTaintBasedEvictions { + nc.zoneNoExecuteTainter[k].SwapLimiter(0) + } else { + nc.zonePodEvictor[k].SwapLimiter(0) + } + } + for k := range nc.zoneStates { + nc.zoneStates[k] = stateFullDisruption + } + // All rate limiters are updated, so we can return early here. + return + } + // We're exiting full disruption mode + if allWasFullyDisrupted { + glog.V(0).Info("Controller detected that some Nodes are Ready. Exiting master disruption mode.") + // When exiting disruption mode update probe timestamps on all Nodes. + now := nc.now() + for i := range nodes { + v := nc.nodeStatusMap[nodes[i].Name] + v.probeTimestamp = now + v.readyTransitionTimestamp = now + nc.nodeStatusMap[nodes[i].Name] = v + } + // We reset all rate limiters to settings appropriate for the given state. + for k := range nc.zoneStates { + nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k]) + nc.zoneStates[k] = newZoneStates[k] + } + return + } + // We know that there's at least one not-fully disrupted so, + // we can use default behavior for rate limiters + for k, v := range nc.zoneStates { + newState := newZoneStates[k] + if v == newState { + continue + } + glog.V(0).Infof("Controller detected that zone %v is now in state %v.", k, newState) + nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState) + nc.zoneStates[k] = newState + } + } +} + +func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) { + switch state { + case stateNormal: + if nc.useTaintBasedEvictions { + nc.zoneNoExecuteTainter[zone].SwapLimiter(nc.evictionLimiterQPS) + } else { + nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS) + } + case statePartialDisruption: + if nc.useTaintBasedEvictions { + nc.zoneNoExecuteTainter[zone].SwapLimiter( + nc.enterPartialDisruptionFunc(zoneSize)) + } else { + nc.zonePodEvictor[zone].SwapLimiter( + nc.enterPartialDisruptionFunc(zoneSize)) + } + case stateFullDisruption: + if nc.useTaintBasedEvictions { + nc.zoneNoExecuteTainter[zone].SwapLimiter( + nc.enterFullDisruptionFunc(zoneSize)) + } else { + nc.zonePodEvictor[zone].SwapLimiter( + nc.enterFullDisruptionFunc(zoneSize)) + } + } +} + // classifyNodes classifies the allNodes to three categories: // 1. added: the nodes that in 'allNodes', but not in 'knownNodeSet' // 2. deleted: the nodes that in 'knownNodeSet', but not in 'allNodes' @@ -1116,6 +1021,41 @@ func (nc *Controller) classifyNodes(allNodes []*v1.Node) (added, deleted, newZon return } +// HealthyQPSFunc returns the default value for cluster eviction rate - we take +// nodeNum for consistency with ReducedQPSFunc. +func (nc *Controller) HealthyQPSFunc(nodeNum int) float32 { + return nc.evictionLimiterQPS +} + +// ReducedQPSFunc returns the QPS for when a the cluster is large make +// evictions slower, if they're small stop evictions altogether. +func (nc *Controller) ReducedQPSFunc(nodeNum int) float32 { + if int32(nodeNum) > nc.largeClusterThreshold { + return nc.secondaryEvictionLimiterQPS + } + return 0 +} + +// addPodEvictorForNewZone checks if new zone appeared, and if so add new evictor. +func (nc *Controller) addPodEvictorForNewZone(node *v1.Node) { + zone := utilnode.GetZoneKey(node) + if _, found := nc.zoneStates[zone]; !found { + nc.zoneStates[zone] = stateInitial + if !nc.useTaintBasedEvictions { + nc.zonePodEvictor[zone] = + scheduler.NewRateLimitedTimedQueue( + flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst)) + } else { + nc.zoneNoExecuteTainter[zone] = + scheduler.NewRateLimitedTimedQueue( + flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst)) + } + // Init the metric for the new zone. + glog.Infof("Initializing eviction metric for zone: %v", zone) + evictionsNumber.WithLabelValues(zone).Add(0) + } +} + // cancelPodEviction removes any queued evictions, typically because the node is available again. It // returns true if an eviction was queued. func (nc *Controller) cancelPodEviction(node *v1.Node) bool { @@ -1141,7 +1081,7 @@ func (nc *Controller) evictPods(node *v1.Node) bool { func (nc *Controller) markNodeForTainting(node *v1.Node) bool { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() - return nc.zoneNoExecuteTainer[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID)) + return nc.zoneNoExecuteTainter[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID)) } func (nc *Controller) markNodeAsReachable(node *v1.Node) (bool, error) { @@ -1157,22 +1097,7 @@ func (nc *Controller) markNodeAsReachable(node *v1.Node) (bool, error) { glog.Errorf("Failed to remove taint from node %v: %v", node.Name, err) return false, err } - return nc.zoneNoExecuteTainer[utilnode.GetZoneKey(node)].Remove(node.Name), nil -} - -// HealthyQPSFunc returns the default value for cluster eviction rate - we take -// nodeNum for consistency with ReducedQPSFunc. -func (nc *Controller) HealthyQPSFunc(nodeNum int) float32 { - return nc.evictionLimiterQPS -} - -// ReducedQPSFunc returns the QPS for when a the cluster is large make -// evictions slower, if they're small stop evictions altogether. -func (nc *Controller) ReducedQPSFunc(nodeNum int) float32 { - if int32(nodeNum) > nc.largeClusterThreshold { - return nc.secondaryEvictionLimiterQPS - } - return 0 + return nc.zoneNoExecuteTainter[utilnode.GetZoneKey(node)].Remove(node.Name), nil } // ComputeZoneState returns a slice of NodeReadyConditions for all Nodes in a given zone. diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go similarity index 93% rename from pkg/controller/node/nodecontroller_test.go rename to pkg/controller/nodelifecycle/node_lifecycle_controller_test.go index a871dcbc6e..93affbb9d6 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,10 +14,9 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package nodelifecycle import ( - "net" "strings" "testing" "time" @@ -39,10 +38,9 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/node/ipam" - "k8s.io/kubernetes/pkg/controller/node/scheduler" - "k8s.io/kubernetes/pkg/controller/node/util" + "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler" "k8s.io/kubernetes/pkg/controller/testutil" + nodeutil "k8s.io/kubernetes/pkg/controller/util/node" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/pkg/util/node" taintutils "k8s.io/kubernetes/pkg/util/taints" @@ -60,13 +58,46 @@ const ( func alwaysReady() bool { return true } -type nodeController struct { +type nodeLifecycleController struct { *Controller nodeInformer coreinformers.NodeInformer daemonSetInformer extensionsinformers.DaemonSetInformer } -func newNodeControllerFromClient( +// doEviction does the fake eviction and returns the status of eviction operation. +func (nc *nodeLifecycleController) doEviction(fakeNodeHandler *testutil.FakeNodeHandler) bool { + var podEvicted bool + zones := testutil.GetZones(fakeNodeHandler) + for _, zone := range zones { + nc.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { + uid, _ := value.UID.(string) + nodeutil.DeletePods(fakeNodeHandler, nc.recorder, value.Value, uid, nc.daemonSetStore) + return true, 0 + }) + } + + for _, action := range fakeNodeHandler.Actions() { + if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { + podEvicted = true + return podEvicted + } + } + return podEvicted +} + +func (nc *nodeLifecycleController) syncNodeStore(fakeNodeHandler *testutil.FakeNodeHandler) error { + nodes, err := fakeNodeHandler.List(metav1.ListOptions{}) + if err != nil { + return err + } + newElems := make([]interface{}, 0, len(nodes.Items)) + for i := range nodes.Items { + newElems = append(newElems, &nodes.Items[i]) + } + return nc.nodeInformer.Informer().GetStore().Replace(newElems, "newRV") +} + +func newNodeLifecycleControllerFromClient( cloud cloudprovider.Interface, kubeClient clientset.Interface, podEvictionTimeout time.Duration, @@ -77,37 +108,28 @@ func newNodeControllerFromClient( nodeMonitorGracePeriod time.Duration, nodeStartupGracePeriod time.Duration, nodeMonitorPeriod time.Duration, - clusterCIDR *net.IPNet, - serviceCIDR *net.IPNet, - nodeCIDRMaskSize int, - allocateNodeCIDRs bool, useTaints bool, -) (*nodeController, error) { +) (*nodeLifecycleController, error) { factory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) nodeInformer := factory.Core().V1().Nodes() daemonSetInformer := factory.Extensions().V1beta1().DaemonSets() - nc, err := NewNodeController( + nc, err := NewNodeLifecycleController( factory.Core().V1().Pods(), nodeInformer, daemonSetInformer, cloud, kubeClient, + nodeMonitorPeriod, + nodeStartupGracePeriod, + nodeMonitorGracePeriod, podEvictionTimeout, evictionLimiterQPS, secondaryEvictionLimiterQPS, largeClusterThreshold, unhealthyZoneThreshold, - nodeMonitorGracePeriod, - nodeStartupGracePeriod, - nodeMonitorPeriod, - clusterCIDR, - serviceCIDR, - nodeCIDRMaskSize, - allocateNodeCIDRs, - ipam.RangeAllocatorType, useTaints, useTaints, useTaints, @@ -120,19 +142,7 @@ func newNodeControllerFromClient( nc.nodeInformerSynced = alwaysReady nc.daemonSetInformerSynced = alwaysReady - return &nodeController{nc, nodeInformer, daemonSetInformer}, nil -} - -func syncNodeStore(nc *nodeController, fakeNodeHandler *testutil.FakeNodeHandler) error { - nodes, err := fakeNodeHandler.List(metav1.ListOptions{}) - if err != nil { - return err - } - newElems := make([]interface{}, 0, len(nodes.Items)) - for i := range nodes.Items { - newElems = append(newElems, &nodes.Items[i]) - } - return nc.nodeInformer.Informer().GetStore().Replace(newElems, "newRV") + return &nodeLifecycleController{nc, nodeInformer, daemonSetInformer}, nil } func TestMonitorNodeStatusEvictPods(t *testing.T) { @@ -597,7 +607,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { } for _, item := range table { - nodeController, _ := newNodeControllerFromClient( + nodeController, _ := newNodeLifecycleControllerFromClient( nil, item.fakeNodeHandler, evictionTimeout, @@ -608,17 +618,13 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, - nil, - nil, - 0, - false, false) nodeController.now = func() metav1.Time { return fakeNow } nodeController.recorder = testutil.NewFakeRecorder() for _, ds := range item.daemonSets { nodeController.daemonSetInformer.Informer().GetStore().Add(&ds) } - if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -633,7 +639,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { item.fakeNodeHandler.Existing[0].Labels = labels item.fakeNodeHandler.Existing[1].Labels = labels } - if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -644,7 +650,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { if _, ok := nodeController.zonePodEvictor[zone]; ok { nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { nodeUID, _ := value.UID.(string) - util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister()) + nodeutil.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister()) return true, 0 }) } else { @@ -763,12 +769,21 @@ func TestPodStatusChange(t *testing.T) { } for _, item := range table { - nodeController, _ := newNodeControllerFromClient(nil, item.fakeNodeHandler, - evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, - testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + item.fakeNodeHandler, + evictionTimeout, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + false) nodeController.now = func() metav1.Time { return fakeNow } nodeController.recorder = testutil.NewFakeRecorder() - if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -779,7 +794,7 @@ func TestPodStatusChange(t *testing.T) { item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus item.fakeNodeHandler.Existing[1].Status = item.secondNodeNewStatus } - if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -789,7 +804,7 @@ func TestPodStatusChange(t *testing.T) { for _, zone := range zones { nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { nodeUID, _ := value.UID.(string) - util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetStore) + nodeutil.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetStore) return true, 0 }) } @@ -809,7 +824,6 @@ func TestPodStatusChange(t *testing.T) { t.Errorf("expected pod update: %+v, got %+v for %+v", podReasonUpdate, item.expectedPodUpdate, item.description) } } - } func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { @@ -1280,9 +1294,18 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { Existing: item.nodeList, Clientset: fake.NewSimpleClientset(&v1.PodList{Items: item.podList}), } - nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, - evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, - testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + fakeNodeHandler, + evictionTimeout, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + false) nodeController.now = func() metav1.Time { return fakeNow } nodeController.enterPartialDisruptionFunc = func(nodeNum int) float32 { return testRateLimiterQPS @@ -1291,7 +1314,7 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { nodeController.enterFullDisruptionFunc = func(nodeNum int) float32 { return testRateLimiterQPS } - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1309,7 +1332,7 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { fakeNodeHandler.Existing[i].Status = item.updatedNodeStatuses[i] } - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1337,27 +1360,6 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { } } -// doEviction does the fake eviction and returns the status of eviction operation. -func (nc *nodeController) doEviction(fakeNodeHandler *testutil.FakeNodeHandler) bool { - var podEvicted bool - zones := testutil.GetZones(fakeNodeHandler) - for _, zone := range zones { - nc.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { - uid, _ := value.UID.(string) - util.DeletePods(fakeNodeHandler, nc.recorder, value.Value, uid, nc.daemonSetStore) - return true, 0 - }) - } - - for _, action := range fakeNodeHandler.Actions() { - if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { - podEvicted = true - return podEvicted - } - } - return podEvicted -} - // TestCloudProviderNoRateLimit tests that monitorNodes() immediately deletes // pods and the node when kubelet has not reported, and the cloudprovider says // the node is gone. @@ -1384,10 +1386,18 @@ func TestCloudProviderNoRateLimit(t *testing.T) { Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0"), *testutil.NewPod("pod1", "node0")}}), DeleteWaitChan: make(chan struct{}), } - nodeController, _ := newNodeControllerFromClient(nil, fnh, 10*time.Minute, - testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, - testNodeMonitorGracePeriod, testNodeStartupGracePeriod, - testNodeMonitorPeriod, nil, nil, 0, false, false) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + fnh, + 10*time.Minute, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + false) nodeController.cloud = &fakecloud.FakeCloud{} nodeController.now = func() metav1.Time { return metav1.Date(2016, 1, 1, 12, 0, 0, 0, time.UTC) } nodeController.recorder = testutil.NewFakeRecorder() @@ -1395,7 +1405,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) { return false, nil } // monitorNodeStatus should allow this node to be immediately deleted - if err := syncNodeStore(nodeController, fnh); err != nil { + if err := nodeController.syncNodeStore(fnh); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1624,12 +1634,21 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for i, item := range table { - nodeController, _ := newNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute, - testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, - testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + item.fakeNodeHandler, + 5*time.Minute, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + false) nodeController.now = func() metav1.Time { return fakeNow } nodeController.recorder = testutil.NewFakeRecorder() - if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1638,7 +1657,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { if item.timeToPass > 0 { nodeController.now = func() metav1.Time { return metav1.Time{Time: fakeNow.Add(item.timeToPass)} } item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus - if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1768,12 +1787,21 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) { } for i, item := range table { - nodeController, _ := newNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute, - testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, - testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + item.fakeNodeHandler, + 5*time.Minute, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + false) nodeController.now = func() metav1.Time { return fakeNow } nodeController.recorder = testutil.NewFakeRecorder() - if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1782,7 +1810,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) { if item.timeToPass > 0 { nodeController.now = func() metav1.Time { return metav1.Time{Time: fakeNow.Add(item.timeToPass)} } item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus - if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1879,12 +1907,21 @@ func TestSwapUnreachableNotReadyTaints(t *testing.T) { originalTaint := UnreachableTaintTemplate updatedTaint := NotReadyTaintTemplate - nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, - evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, - testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, true) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + fakeNodeHandler, + evictionTimeout, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + true) nodeController.now = func() metav1.Time { return fakeNow } nodeController.recorder = testutil.NewFakeRecorder() - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1922,7 +1959,7 @@ func TestSwapUnreachableNotReadyTaints(t *testing.T) { return } - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1972,9 +2009,18 @@ func TestTaintsNodeByCondition(t *testing.T) { Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), } - nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, evictionTimeout, - testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, - testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, true) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + fakeNodeHandler, + evictionTimeout, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + true) nodeController.now = func() metav1.Time { return fakeNow } nodeController.recorder = testutil.NewFakeRecorder() @@ -2098,11 +2144,11 @@ func TestTaintsNodeByCondition(t *testing.T) { for _, test := range tests { fakeNodeHandler.Update(test.Node) - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } nodeController.doNoScheduleTaintingPass(test.Node) - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } node0, err := nodeController.nodeLister.Get("node0") @@ -2150,10 +2196,18 @@ func TestNodeEventGeneration(t *testing.T) { Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), } - nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute, - testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, - testNodeMonitorGracePeriod, testNodeStartupGracePeriod, - testNodeMonitorPeriod, nil, nil, 0, false, false) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + fakeNodeHandler, + 5*time.Minute, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + false) nodeController.cloud = &fakecloud.FakeCloud{} nodeController.nodeExistsInCloudProvider = func(nodeName types.NodeName) (bool, error) { return false, nil @@ -2161,7 +2215,7 @@ func TestNodeEventGeneration(t *testing.T) { nodeController.now = func() metav1.Time { return fakeNow } fakeRecorder := testutil.NewFakeRecorder() nodeController.recorder = fakeRecorder - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -2208,9 +2262,18 @@ func TestFixDeprecatedTaintKey(t *testing.T) { Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), } - nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, evictionTimeout, - testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, - testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, true) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + fakeNodeHandler, + evictionTimeout, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + true) nodeController.now = func() metav1.Time { return fakeNow } nodeController.recorder = testutil.NewFakeRecorder() @@ -2319,11 +2382,11 @@ func TestFixDeprecatedTaintKey(t *testing.T) { for _, test := range tests { fakeNodeHandler.Update(test.Node) - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } nodeController.doFixDeprecatedTaintKeyPass(test.Node) - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } node, err := nodeController.nodeLister.Get(test.Node.GetName()) diff --git a/pkg/controller/node/scheduler/BUILD b/pkg/controller/nodelifecycle/scheduler/BUILD similarity index 85% rename from pkg/controller/node/scheduler/BUILD rename to pkg/controller/nodelifecycle/scheduler/BUILD index efe8ad0b8c..c9d54bd628 100644 --- a/pkg/controller/node/scheduler/BUILD +++ b/pkg/controller/nodelifecycle/scheduler/BUILD @@ -1,39 +1,14 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) - -go_test( - name = "go_default_test", - srcs = [ - "rate_limited_queue_test.go", - "taint_controller_test.go", - "timed_workers_test.go", - ], - embed = [":go_default_library"], - importpath = "k8s.io/kubernetes/pkg/controller/node/scheduler", - deps = [ - "//pkg/controller/testutil:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", - "//vendor/k8s.io/client-go/testing:go_default_library", - "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", - ], -) +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ "rate_limited_queue.go", - "taint_controller.go", + "taint_manager.go", "timed_workers.go", ], - importpath = "k8s.io/kubernetes/pkg/controller/node/scheduler", + importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler", + visibility = ["//visibility:public"], deps = [ "//pkg/apis/core/helper:go_default_library", "//pkg/apis/core/v1/helper:go_default_library", @@ -53,6 +28,26 @@ go_library( ], ) +go_test( + name = "go_default_test", + srcs = [ + "rate_limited_queue_test.go", + "taint_manager_test.go", + "timed_workers_test.go", + ], + embed = [":go_default_library"], + importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler", + deps = [ + "//pkg/controller/testutil:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", + ], +) + filegroup( name = "package-srcs", srcs = glob(["**"]), @@ -64,4 +59,5 @@ filegroup( name = "all-srcs", srcs = [":package-srcs"], tags = ["automanaged"], + visibility = ["//visibility:public"], ) diff --git a/pkg/controller/node/scheduler/rate_limited_queue.go b/pkg/controller/nodelifecycle/scheduler/rate_limited_queue.go similarity index 100% rename from pkg/controller/node/scheduler/rate_limited_queue.go rename to pkg/controller/nodelifecycle/scheduler/rate_limited_queue.go diff --git a/pkg/controller/node/scheduler/rate_limited_queue_test.go b/pkg/controller/nodelifecycle/scheduler/rate_limited_queue_test.go similarity index 100% rename from pkg/controller/node/scheduler/rate_limited_queue_test.go rename to pkg/controller/nodelifecycle/scheduler/rate_limited_queue_test.go diff --git a/pkg/controller/node/scheduler/taint_controller.go b/pkg/controller/nodelifecycle/scheduler/taint_manager.go similarity index 99% rename from pkg/controller/node/scheduler/taint_controller.go rename to pkg/controller/nodelifecycle/scheduler/taint_manager.go index bf641f69fb..a71fa8fc78 100644 --- a/pkg/controller/node/scheduler/taint_controller.go +++ b/pkg/controller/nodelifecycle/scheduler/taint_manager.go @@ -18,9 +18,6 @@ package scheduler import ( "fmt" - "sync" - "time" - "k8s.io/api/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/apis/core/helper" @@ -30,6 +27,8 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "sync" + "time" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" diff --git a/pkg/controller/node/scheduler/taint_controller_test.go b/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go similarity index 100% rename from pkg/controller/node/scheduler/taint_controller_test.go rename to pkg/controller/nodelifecycle/scheduler/taint_manager_test.go diff --git a/pkg/controller/node/scheduler/timed_workers.go b/pkg/controller/nodelifecycle/scheduler/timed_workers.go similarity index 100% rename from pkg/controller/node/scheduler/timed_workers.go rename to pkg/controller/nodelifecycle/scheduler/timed_workers.go diff --git a/pkg/controller/node/scheduler/timed_workers_test.go b/pkg/controller/nodelifecycle/scheduler/timed_workers_test.go similarity index 100% rename from pkg/controller/node/scheduler/timed_workers_test.go rename to pkg/controller/nodelifecycle/scheduler/timed_workers_test.go diff --git a/pkg/controller/node/util/BUILD b/pkg/controller/util/node/BUILD similarity index 87% rename from pkg/controller/node/util/BUILD rename to pkg/controller/util/node/BUILD index b14d0cb926..8a46251040 100644 --- a/pkg/controller/node/util/BUILD +++ b/pkg/controller/util/node/BUILD @@ -1,14 +1,10 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", -) +load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", srcs = ["controller_utils.go"], - importpath = "k8s.io/kubernetes/pkg/controller/node/util", + importpath = "k8s.io/kubernetes/pkg/controller/util/node", + visibility = ["//visibility:public"], deps = [ "//pkg/apis/core:go_default_library", "//pkg/cloudprovider:go_default_library", @@ -41,4 +37,5 @@ filegroup( name = "all-srcs", srcs = [":package-srcs"], tags = ["automanaged"], + visibility = ["//visibility:public"], ) diff --git a/pkg/controller/node/util/controller_utils.go b/pkg/controller/util/node/controller_utils.go similarity index 96% rename from pkg/controller/node/util/controller_utils.go rename to pkg/controller/util/node/controller_utils.go index 643defce96..4c9a9279ac 100644 --- a/pkg/controller/node/util/controller_utils.go +++ b/pkg/controller/util/node/controller_utils.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package node import ( "errors" @@ -170,9 +170,9 @@ func MarkAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error { return fmt.Errorf("%v", strings.Join(errMsg, "; ")) } -// NodeExistsInCloudProvider returns true if the node exists in the +// ExistsInCloudProvider returns true if the node exists in the // cloud provider. -func NodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeName) (bool, error) { +func ExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeName) (bool, error) { instances, ok := cloud.Instances() if !ok { return false, fmt.Errorf("%v", ErrCloudInstance) @@ -198,7 +198,7 @@ func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event) } -// RecordNodeStatusChange records a event related to a node status change. +// RecordNodeStatusChange records a event related to a node status change. (Common to lifecycle and ipam) func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) { ref := &v1.ObjectReference{ Kind: "Node", @@ -257,7 +257,7 @@ func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { } } -// CreateUpdateNodeHandler creates a node update handler. +// CreateUpdateNodeHandler creates a node update handler. (Common to lifecycle and ipam) func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) { return func(origOldObj, origNewObj interface{}) { node := origNewObj.(*v1.Node).DeepCopy() @@ -269,7 +269,7 @@ func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldOb } } -// CreateDeleteNodeHandler creates a delete node handler. +// CreateDeleteNodeHandler creates a delete node handler. (Common to lifecycle and ipam) func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { return func(originalObj interface{}) { originalNode, isNode := originalObj.(*v1.Node) diff --git a/test/e2e/apps/BUILD b/test/e2e/apps/BUILD index b6bb7d4fbe..d7194290d6 100644 --- a/test/e2e/apps/BUILD +++ b/test/e2e/apps/BUILD @@ -33,7 +33,7 @@ go_library( "//pkg/controller/daemon:go_default_library", "//pkg/controller/deployment/util:go_default_library", "//pkg/controller/job:go_default_library", - "//pkg/controller/node:go_default_library", + "//pkg/controller/nodelifecycle:go_default_library", "//pkg/controller/replicaset:go_default_library", "//pkg/controller/replication:go_default_library", "//pkg/kubectl:go_default_library", diff --git a/test/e2e/apps/network_partition.go b/test/e2e/apps/network_partition.go index 4cabc2b46b..b9a22afc48 100644 --- a/test/e2e/apps/network_partition.go +++ b/test/e2e/apps/network_partition.go @@ -33,7 +33,7 @@ import ( "k8s.io/api/core/v1" clientset "k8s.io/client-go/kubernetes" api "k8s.io/kubernetes/pkg/apis/core" - nodepkg "k8s.io/kubernetes/pkg/controller/node" + nodepkg "k8s.io/kubernetes/pkg/controller/nodelifecycle" "k8s.io/kubernetes/test/e2e/common" "k8s.io/kubernetes/test/e2e/framework" testutils "k8s.io/kubernetes/test/utils" diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index 66bab803c1..9f7e222fed 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -57,7 +57,7 @@ go_library( "//pkg/cloudprovider/providers/gce:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/deployment/util:go_default_library", - "//pkg/controller/node:go_default_library", + "//pkg/controller/nodelifecycle:go_default_library", "//pkg/features:go_default_library", "//pkg/kubectl:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index bdf52dfdd9..58c01bf61d 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -86,7 +86,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider/providers/azure" gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/controller" - nodectlr "k8s.io/kubernetes/pkg/controller/node" + nodectlr "k8s.io/kubernetes/pkg/controller/nodelifecycle" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubelet/util/format" diff --git a/test/integration/garbagecollector/BUILD b/test/integration/garbagecollector/BUILD index 8bd1651d13..d6a537df07 100644 --- a/test/integration/garbagecollector/BUILD +++ b/test/integration/garbagecollector/BUILD @@ -1,9 +1,4 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_test", -) +load("@io_bazel_rules_go//go:def.bzl", "go_test") go_test( name = "go_default_test", @@ -52,4 +47,5 @@ filegroup( name = "all-srcs", srcs = [":package-srcs"], tags = ["automanaged"], + visibility = ["//visibility:public"], ) diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index 0fbdc42db6..ec4b7bb01a 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -30,8 +30,7 @@ go_test( "//pkg/apis/core/v1/helper:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/informers/informers_generated/internalversion:go_default_library", - "//pkg/controller/node:go_default_library", - "//pkg/controller/node/ipam:go_default_library", + "//pkg/controller/nodelifecycle:go_default_library", "//pkg/controller/volume/persistentvolume:go_default_library", "//pkg/features:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", diff --git a/test/integration/scheduler/taint_test.go b/test/integration/scheduler/taint_test.go index da8fc51aa0..bb227ba4dd 100644 --- a/test/integration/scheduler/taint_test.go +++ b/test/integration/scheduler/taint_test.go @@ -37,8 +37,7 @@ import ( "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" internalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" - "k8s.io/kubernetes/pkg/controller/node" - "k8s.io/kubernetes/pkg/controller/node/ipam" + "k8s.io/kubernetes/pkg/controller/nodelifecycle" kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" "k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction" pluginapi "k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction/apis/podtolerationrestriction" @@ -85,29 +84,24 @@ func TestTaintNodeByCondition(t *testing.T) { controllerCh := make(chan struct{}) defer close(controllerCh) - // Start NodeController for taint. - nc, err := node.NewNodeController( + // Start NodeLifecycleController for taint. + nc, err := nodelifecycle.NewNodeLifecycleController( informers.Core().V1().Pods(), informers.Core().V1().Nodes(), informers.Extensions().V1beta1().DaemonSets(), nil, // CloudProvider clientset, + time.Second, // Node monitor grace period + time.Second, // Node startup grace period + time.Second, // Node monitor period time.Second, // Pod eviction timeout 100, // Eviction limiter QPS 100, // Secondary eviction limiter QPS 100, // Large cluster threshold 100, // Unhealthy zone threshold - time.Second, // Node monitor grace period - time.Second, // Node startup grace period - time.Second, // Node monitor period - nil, // Cluster CIDR - nil, // Service CIDR - 0, // Node CIDR mask size - false, // Allocate node CIDRs - ipam.RangeAllocatorType, // Allocator type - true, // Run taint manger - true, // Enabled taint based eviction - true, // Enabled TaintNodeByCondition feature + true, // Run taint manager + true, // Use taint based evictions + true, // Enabled TaintNodeByCondition feature ) if err != nil { t.Errorf("Failed to create node controller: %v", err)