diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 7bee9288b4..d6940c30d3 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -58,8 +58,9 @@ go_library( "//pkg/controller/garbagecollector:go_default_library", "//pkg/controller/job:go_default_library", "//pkg/controller/namespace:go_default_library", - "//pkg/controller/node:go_default_library", - "//pkg/controller/node/ipam:go_default_library", + "//pkg/controller/nodeipam:go_default_library", + "//pkg/controller/nodeipam/ipam:go_default_library", + "//pkg/controller/nodelifecycle:go_default_library", "//pkg/controller/podautoscaler:go_default_library", "//pkg/controller/podautoscaler/metrics:go_default_library", "//pkg/controller/podgc:go_default_library", diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 1f7832ed9d..85dc51de5c 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -368,10 +368,12 @@ func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc controllers["tokencleaner"] = startTokenCleanerController if loopMode == IncludeCloudLoops { controllers["service"] = startServiceController + controllers["nodeipam"] = startNodeIpamController controllers["route"] = startRouteController - // TODO: Move node controller and volume controller into the IncludeCloudLoops only set. + // TODO: volume controller into the IncludeCloudLoops only set. + // TODO: Separate cluster in cloud check from node lifecycle controller. } - controllers["node"] = startNodeController + controllers["nodelifecycle"] = startNodeLifecycleController controllers["persistentvolume-binder"] = startPersistentVolumeBinderController controllers["attachdetach"] = startAttachDetachController controllers["persistentvolume-expander"] = startVolumeExpandController diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 819d18e0f0..2fceb2370b 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -41,8 +41,9 @@ import ( endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/pkg/controller/garbagecollector" namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" - nodecontroller "k8s.io/kubernetes/pkg/controller/node" - "k8s.io/kubernetes/pkg/controller/node/ipam" + nodeipamcontroller "k8s.io/kubernetes/pkg/controller/nodeipam" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam" + lifecyclecontroller "k8s.io/kubernetes/pkg/controller/nodelifecycle" "k8s.io/kubernetes/pkg/controller/podgc" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" @@ -77,7 +78,7 @@ func startServiceController(ctx ControllerContext) (bool, error) { return true, nil } -func startNodeController(ctx ControllerContext) (bool, error) { +func startNodeIpamController(ctx ControllerContext) (bool, error) { var clusterCIDR *net.IPNet = nil var serviceCIDR *net.IPNet = nil if ctx.Options.AllocateNodeCIDRs { @@ -97,25 +98,38 @@ func startNodeController(ctx ControllerContext) (bool, error) { } } - nodeController, err := nodecontroller.NewNodeController( - ctx.InformerFactory.Core().V1().Pods(), + nodeIpamController, err := nodeipamcontroller.NewNodeIpamController( ctx.InformerFactory.Core().V1().Nodes(), - ctx.InformerFactory.Extensions().V1beta1().DaemonSets(), ctx.Cloud, ctx.ClientBuilder.ClientOrDie("node-controller"), - ctx.Options.PodEvictionTimeout.Duration, - ctx.Options.NodeEvictionRate, - ctx.Options.SecondaryNodeEvictionRate, - ctx.Options.LargeClusterSizeThreshold, - ctx.Options.UnhealthyZoneThreshold, - ctx.Options.NodeMonitorGracePeriod.Duration, - ctx.Options.NodeStartupGracePeriod.Duration, - ctx.Options.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(ctx.Options.NodeCIDRMaskSize), ctx.Options.AllocateNodeCIDRs, ipam.CIDRAllocatorType(ctx.Options.CIDRAllocatorType), + ) + if err != nil { + return true, err + } + go nodeIpamController.Run(ctx.Stop) + return true, nil +} + +func startNodeLifecycleController(ctx ControllerContext) (bool, error) { + lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController( + ctx.InformerFactory.Core().V1().Pods(), + ctx.InformerFactory.Core().V1().Nodes(), + ctx.InformerFactory.Extensions().V1beta1().DaemonSets(), + ctx.Cloud, + ctx.ClientBuilder.ClientOrDie("node-controller"), + ctx.Options.NodeMonitorPeriod.Duration, + ctx.Options.NodeStartupGracePeriod.Duration, + ctx.Options.NodeMonitorGracePeriod.Duration, + ctx.Options.PodEvictionTimeout.Duration, + ctx.Options.NodeEvictionRate, + ctx.Options.SecondaryNodeEvictionRate, + ctx.Options.LargeClusterSizeThreshold, + ctx.Options.UnhealthyZoneThreshold, ctx.Options.EnableTaintManager, utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions), utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition), @@ -123,7 +137,7 @@ func startNodeController(ctx ControllerContext) (bool, error) { if err != nil { return true, err } - go nodeController.Run(ctx.Stop) + go lifecycleController.Run(ctx.Stop) return true, nil } diff --git a/pkg/controller/BUILD b/pkg/controller/BUILD index f1e808f2ee..7c2db32b5d 100644 --- a/pkg/controller/BUILD +++ b/pkg/controller/BUILD @@ -117,7 +117,8 @@ filegroup( "//pkg/controller/history:all-srcs", "//pkg/controller/job:all-srcs", "//pkg/controller/namespace:all-srcs", - "//pkg/controller/node:all-srcs", + "//pkg/controller/nodeipam:all-srcs", + "//pkg/controller/nodelifecycle:all-srcs", "//pkg/controller/podautoscaler:all-srcs", "//pkg/controller/podgc:all-srcs", "//pkg/controller/replicaset:all-srcs", @@ -129,6 +130,7 @@ filegroup( "//pkg/controller/statefulset:all-srcs", "//pkg/controller/testutil:all-srcs", "//pkg/controller/ttl:all-srcs", + "//pkg/controller/util/node:all-srcs", "//pkg/controller/volume/attachdetach:all-srcs", "//pkg/controller/volume/events:all-srcs", "//pkg/controller/volume/expand:all-srcs", diff --git a/pkg/controller/nodeipam/BUILD b/pkg/controller/nodeipam/BUILD new file mode 100644 index 0000000000..46a62bb93b --- /dev/null +++ b/pkg/controller/nodeipam/BUILD @@ -0,0 +1,48 @@ +package(default_visibility = ["//visibility:public"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "metrics.go", + "node_ipam_controller.go", + ], + importpath = "k8s.io/kubernetes/pkg/controller/nodeipam", + deps = [ + "//pkg/cloudprovider:go_default_library", + "//pkg/controller:go_default_library", + "//pkg/controller/nodeipam/ipam:go_default_library", + "//pkg/controller/nodeipam/ipam/sync:go_default_library", + "//pkg/util/metrics:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/controller/nodeipam/ipam:all-srcs", + ], + tags = ["automanaged"], +) diff --git a/pkg/controller/node/OWNERS b/pkg/controller/nodeipam/OWNERS similarity index 100% rename from pkg/controller/node/OWNERS rename to pkg/controller/nodeipam/OWNERS diff --git a/pkg/controller/node/doc.go b/pkg/controller/nodeipam/doc.go similarity index 80% rename from pkg/controller/node/doc.go rename to pkg/controller/nodeipam/doc.go index b649f1dda4..a7b2d12db8 100644 --- a/pkg/controller/node/doc.go +++ b/pkg/controller/nodeipam/doc.go @@ -14,6 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package node contains code for syncing cloud instances with +// Package nodeipam contains code for syncing cloud instances with // node registry -package node // import "k8s.io/kubernetes/pkg/controller/node" +package nodeipam // import "k8s.io/kubernetes/pkg/controller/nodeipam" diff --git a/pkg/controller/node/ipam/BUILD b/pkg/controller/nodeipam/ipam/BUILD similarity index 82% rename from pkg/controller/node/ipam/BUILD rename to pkg/controller/nodeipam/ipam/BUILD index 667f29f6b0..5a1e1018a2 100644 --- a/pkg/controller/node/ipam/BUILD +++ b/pkg/controller/nodeipam/ipam/BUILD @@ -14,11 +14,11 @@ go_test( "timeout_test.go", ], embed = [":go_default_library"], - importpath = "k8s.io/kubernetes/pkg/controller/node/ipam", + importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam", deps = [ "//pkg/controller:go_default_library", - "//pkg/controller/node/ipam/cidrset:go_default_library", - "//pkg/controller/node/ipam/test:go_default_library", + "//pkg/controller/nodeipam/ipam/cidrset:go_default_library", + "//pkg/controller/nodeipam/ipam/test:go_default_library", "//pkg/controller/testutil:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -40,15 +40,15 @@ go_library( "range_allocator.go", "timeout.go", ], - importpath = "k8s.io/kubernetes/pkg/controller/node/ipam", + importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam", deps = [ "//pkg/api/v1/node:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider/providers/gce:go_default_library", "//pkg/controller:go_default_library", - "//pkg/controller/node/ipam/cidrset:go_default_library", - "//pkg/controller/node/ipam/sync:go_default_library", - "//pkg/controller/node/util:go_default_library", + "//pkg/controller/nodeipam/ipam/cidrset:go_default_library", + "//pkg/controller/nodeipam/ipam/sync:go_default_library", + "//pkg/controller/util/node:go_default_library", "//pkg/util/node:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", @@ -82,9 +82,9 @@ filegroup( name = "all-srcs", srcs = [ ":package-srcs", - "//pkg/controller/node/ipam/cidrset:all-srcs", - "//pkg/controller/node/ipam/sync:all-srcs", - "//pkg/controller/node/ipam/test:all-srcs", + "//pkg/controller/nodeipam/ipam/cidrset:all-srcs", + "//pkg/controller/nodeipam/ipam/sync:all-srcs", + "//pkg/controller/nodeipam/ipam/test:all-srcs", ], tags = ["automanaged"], ) diff --git a/pkg/controller/node/ipam/OWNERS b/pkg/controller/nodeipam/ipam/OWNERS similarity index 100% rename from pkg/controller/node/ipam/OWNERS rename to pkg/controller/nodeipam/ipam/OWNERS diff --git a/pkg/controller/node/ipam/adapter.go b/pkg/controller/nodeipam/ipam/adapter.go similarity index 100% rename from pkg/controller/node/ipam/adapter.go rename to pkg/controller/nodeipam/ipam/adapter.go diff --git a/pkg/controller/node/ipam/cidr_allocator.go b/pkg/controller/nodeipam/ipam/cidr_allocator.go similarity index 100% rename from pkg/controller/node/ipam/cidr_allocator.go rename to pkg/controller/nodeipam/ipam/cidr_allocator.go diff --git a/pkg/controller/node/ipam/cidrset/BUILD b/pkg/controller/nodeipam/ipam/cidrset/BUILD similarity index 80% rename from pkg/controller/node/ipam/cidrset/BUILD rename to pkg/controller/nodeipam/ipam/cidrset/BUILD index e3accb73bc..c1bbda1c69 100644 --- a/pkg/controller/node/ipam/cidrset/BUILD +++ b/pkg/controller/nodeipam/ipam/cidrset/BUILD @@ -10,14 +10,14 @@ go_test( name = "go_default_test", srcs = ["cidr_set_test.go"], embed = [":go_default_library"], - importpath = "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset", + importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset", deps = ["//vendor/github.com/golang/glog:go_default_library"], ) go_library( name = "go_default_library", srcs = ["cidr_set.go"], - importpath = "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset", + importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset", ) filegroup( diff --git a/pkg/controller/node/ipam/cidrset/cidr_set.go b/pkg/controller/nodeipam/ipam/cidrset/cidr_set.go similarity index 100% rename from pkg/controller/node/ipam/cidrset/cidr_set.go rename to pkg/controller/nodeipam/ipam/cidrset/cidr_set.go diff --git a/pkg/controller/node/ipam/cidrset/cidr_set_test.go b/pkg/controller/nodeipam/ipam/cidrset/cidr_set_test.go similarity index 100% rename from pkg/controller/node/ipam/cidrset/cidr_set_test.go rename to pkg/controller/nodeipam/ipam/cidrset/cidr_set_test.go diff --git a/pkg/controller/node/ipam/cloud_cidr_allocator.go b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go similarity index 94% rename from pkg/controller/node/ipam/cloud_cidr_allocator.go rename to pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go index 98b65b22e6..7a07409c7c 100644 --- a/pkg/controller/node/ipam/cloud_cidr_allocator.go +++ b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go @@ -40,7 +40,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/node/util" + nodeutil "k8s.io/kubernetes/pkg/controller/util/node" utilnode "k8s.io/kubernetes/pkg/util/node" ) @@ -101,8 +101,8 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter } nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: util.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR), - UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { + AddFunc: nodeutil.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR), + UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { if newNode.Spec.PodCIDR == "" { return ca.AllocateOrOccupyCIDR(newNode) } @@ -114,7 +114,7 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter } return nil }), - DeleteFunc: util.CreateDeleteNodeHandler(ca.ReleaseCIDR), + DeleteFunc: nodeutil.CreateDeleteNodeHandler(ca.ReleaseCIDR), }) glog.V(0).Infof("Using cloud CIDR allocator (provider: %v)", cloud.ProviderName()) @@ -197,11 +197,11 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error { cidrs, err := ca.cloud.AliasRanges(types.NodeName(nodeName)) if err != nil { - util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") + nodeutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to allocate cidr: %v", err) } if len(cidrs) == 0 { - util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") + nodeutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to allocate cidr: Node %v has no CIDRs", node.Name) } _, cidr, err := net.ParseCIDR(cidrs[0]) @@ -237,7 +237,7 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error { glog.Errorf("Failed to update node %v PodCIDR to %v (%d retries left): %v", node.Name, podCIDR, cidrUpdateRetries-rep-1, err) } if err != nil { - util.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed") + nodeutil.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed") glog.Errorf("CIDR assignment for node %v failed: %v.", nodeName, err) return err } diff --git a/pkg/controller/node/ipam/controller.go b/pkg/controller/nodeipam/ipam/controller.go similarity index 93% rename from pkg/controller/node/ipam/controller.go rename to pkg/controller/nodeipam/ipam/controller.go index 4b1221b678..6ab18d69f6 100644 --- a/pkg/controller/node/ipam/controller.go +++ b/pkg/controller/nodeipam/ipam/controller.go @@ -30,9 +30,9 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" - "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset" - nodesync "k8s.io/kubernetes/pkg/controller/node/ipam/sync" - "k8s.io/kubernetes/pkg/controller/node/util" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset" + nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync" + nodeutil "k8s.io/kubernetes/pkg/controller/util/node" ) // Config for the IPAM controller. @@ -128,9 +128,9 @@ func (c *Controller) Start(nodeInformer informers.NodeInformer) error { } nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: util.CreateAddNodeHandler(c.onAdd), - UpdateFunc: util.CreateUpdateNodeHandler(c.onUpdate), - DeleteFunc: util.CreateDeleteNodeHandler(c.onDelete), + AddFunc: nodeutil.CreateAddNodeHandler(c.onAdd), + UpdateFunc: nodeutil.CreateUpdateNodeHandler(c.onUpdate), + DeleteFunc: nodeutil.CreateDeleteNodeHandler(c.onDelete), }) return nil diff --git a/pkg/controller/node/ipam/controller_test.go b/pkg/controller/nodeipam/ipam/controller_test.go similarity index 94% rename from pkg/controller/node/ipam/controller_test.go rename to pkg/controller/nodeipam/ipam/controller_test.go index 14fbb4340f..6e5a6f9957 100644 --- a/pkg/controller/node/ipam/controller_test.go +++ b/pkg/controller/nodeipam/ipam/controller_test.go @@ -20,8 +20,8 @@ import ( "net" "testing" - "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset" - "k8s.io/kubernetes/pkg/controller/node/ipam/test" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/test" ) func TestOccupyServiceCIDR(t *testing.T) { diff --git a/pkg/controller/node/ipam/doc.go b/pkg/controller/nodeipam/ipam/doc.go similarity index 100% rename from pkg/controller/node/ipam/doc.go rename to pkg/controller/nodeipam/ipam/doc.go diff --git a/pkg/controller/node/ipam/range_allocator.go b/pkg/controller/nodeipam/ipam/range_allocator.go similarity index 94% rename from pkg/controller/node/ipam/range_allocator.go rename to pkg/controller/nodeipam/ipam/range_allocator.go index d3037b1d1d..5de2195854 100644 --- a/pkg/controller/node/ipam/range_allocator.go +++ b/pkg/controller/nodeipam/ipam/range_allocator.go @@ -36,9 +36,9 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset" - "k8s.io/kubernetes/pkg/controller/node/util" - nodeutil "k8s.io/kubernetes/pkg/util/node" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset" + nodeutil "k8s.io/kubernetes/pkg/controller/util/node" + utilnode "k8s.io/kubernetes/pkg/util/node" ) type rangeAllocator struct { @@ -119,8 +119,8 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No } nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: util.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR), - UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { + AddFunc: nodeutil.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR), + UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { // If the PodCIDR is not empty we either: // - already processed a Node that already had a CIDR after NC restarted // (cidr is marked as used), @@ -145,7 +145,7 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No } return nil }), - DeleteFunc: util.CreateDeleteNodeHandler(ra.ReleaseCIDR), + DeleteFunc: nodeutil.CreateDeleteNodeHandler(ra.ReleaseCIDR), }) return ra, nil @@ -234,7 +234,7 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error { podCIDR, err := r.cidrs.AllocateNext() if err != nil { r.removeNodeFromProcessing(node.Name) - util.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") + nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable") return fmt.Errorf("failed to allocate cidr: %v", err) } @@ -303,14 +303,14 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error { } return nil } - if err = nodeutil.PatchNodeCIDR(r.client, types.NodeName(node.Name), podCIDR); err == nil { + if err = utilnode.PatchNodeCIDR(r.client, types.NodeName(node.Name), podCIDR); err == nil { glog.Infof("Set node %v PodCIDR to %v", node.Name, podCIDR) break } glog.Errorf("Failed to update node %v PodCIDR to %v (%d retries left): %v", node.Name, podCIDR, cidrUpdateRetries-rep-1, err) } if err != nil { - util.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed") + nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed") // We accept the fact that we may leek CIDRs here. This is safer than releasing // them in case when we don't know if request went through. // NodeController restart will return all falsely allocated CIDRs to the pool. diff --git a/pkg/controller/node/ipam/range_allocator_test.go b/pkg/controller/nodeipam/ipam/range_allocator_test.go similarity index 100% rename from pkg/controller/node/ipam/range_allocator_test.go rename to pkg/controller/nodeipam/ipam/range_allocator_test.go diff --git a/pkg/controller/node/ipam/sync/BUILD b/pkg/controller/nodeipam/ipam/sync/BUILD similarity index 72% rename from pkg/controller/node/ipam/sync/BUILD rename to pkg/controller/nodeipam/ipam/sync/BUILD index 6530b5d812..2ecba089f4 100644 --- a/pkg/controller/node/ipam/sync/BUILD +++ b/pkg/controller/nodeipam/ipam/sync/BUILD @@ -3,10 +3,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = ["sync.go"], - importpath = "k8s.io/kubernetes/pkg/controller/node/ipam/sync", + importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync", visibility = ["//visibility:public"], deps = [ - "//pkg/controller/node/ipam/cidrset:go_default_library", + "//pkg/controller/nodeipam/ipam/cidrset:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", ], @@ -16,10 +16,10 @@ go_test( name = "go_default_test", srcs = ["sync_test.go"], embed = [":go_default_library"], - importpath = "k8s.io/kubernetes/pkg/controller/node/ipam/sync", + importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync", deps = [ - "//pkg/controller/node/ipam/cidrset:go_default_library", - "//pkg/controller/node/ipam/test:go_default_library", + "//pkg/controller/nodeipam/ipam/cidrset:go_default_library", + "//pkg/controller/nodeipam/ipam/test:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/controller/node/ipam/sync/sync.go b/pkg/controller/nodeipam/ipam/sync/sync.go similarity index 99% rename from pkg/controller/node/ipam/sync/sync.go rename to pkg/controller/nodeipam/ipam/sync/sync.go index 4995f42554..fabc3a1126 100644 --- a/pkg/controller/node/ipam/sync/sync.go +++ b/pkg/controller/nodeipam/ipam/sync/sync.go @@ -25,7 +25,7 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" - "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset" ) const ( diff --git a/pkg/controller/node/ipam/sync/sync_test.go b/pkg/controller/nodeipam/ipam/sync/sync_test.go similarity index 98% rename from pkg/controller/node/ipam/sync/sync_test.go rename to pkg/controller/nodeipam/ipam/sync/sync_test.go index d326848043..4a47280d94 100644 --- a/pkg/controller/node/ipam/sync/sync_test.go +++ b/pkg/controller/nodeipam/ipam/sync/sync_test.go @@ -26,8 +26,8 @@ import ( "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/kubernetes/pkg/controller/node/ipam/cidrset" - "k8s.io/kubernetes/pkg/controller/node/ipam/test" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/test" "k8s.io/api/core/v1" ) diff --git a/pkg/controller/node/ipam/test/BUILD b/pkg/controller/nodeipam/ipam/test/BUILD similarity index 85% rename from pkg/controller/node/ipam/test/BUILD rename to pkg/controller/nodeipam/ipam/test/BUILD index 38155ed097..0c6fd3a281 100644 --- a/pkg/controller/node/ipam/test/BUILD +++ b/pkg/controller/nodeipam/ipam/test/BUILD @@ -3,7 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", srcs = ["utils.go"], - importpath = "k8s.io/kubernetes/pkg/controller/node/ipam/test", + importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/test", visibility = ["//visibility:public"], ) diff --git a/pkg/controller/node/ipam/test/utils.go b/pkg/controller/nodeipam/ipam/test/utils.go similarity index 100% rename from pkg/controller/node/ipam/test/utils.go rename to pkg/controller/nodeipam/ipam/test/utils.go diff --git a/pkg/controller/node/ipam/timeout.go b/pkg/controller/nodeipam/ipam/timeout.go similarity index 100% rename from pkg/controller/node/ipam/timeout.go rename to pkg/controller/nodeipam/ipam/timeout.go diff --git a/pkg/controller/node/ipam/timeout_test.go b/pkg/controller/nodeipam/ipam/timeout_test.go similarity index 100% rename from pkg/controller/node/ipam/timeout_test.go rename to pkg/controller/nodeipam/ipam/timeout_test.go diff --git a/pkg/controller/nodeipam/metrics.go b/pkg/controller/nodeipam/metrics.go new file mode 100644 index 0000000000..9211ce3f38 --- /dev/null +++ b/pkg/controller/nodeipam/metrics.go @@ -0,0 +1,21 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeipam + +// Register the metrics that are to be monitored. +func Register() { +} diff --git a/pkg/controller/nodeipam/node_ipam_controller.go b/pkg/controller/nodeipam/node_ipam_controller.go new file mode 100644 index 0000000000..e2dad9e4f5 --- /dev/null +++ b/pkg/controller/nodeipam/node_ipam_controller.go @@ -0,0 +1,187 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeipam + +import ( + "net" + "time" + + "github.com/golang/glog" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + + "k8s.io/api/core/v1" + coreinformers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/nodeipam/ipam" + nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync" + "k8s.io/kubernetes/pkg/util/metrics" +) + +func init() { + // Register prometheus metrics + Register() +} + +const ( + // ipamResyncInterval is the amount of time between when the cloud and node + // CIDR range assignments are synchronized. + ipamResyncInterval = 30 * time.Second + // ipamMaxBackoff is the maximum backoff for retrying synchronization of a + // given in the error state. + ipamMaxBackoff = 10 * time.Second + // ipamInitialRetry is the initial retry interval for retrying synchronization of a + // given in the error state. + ipamInitialBackoff = 250 * time.Millisecond +) + +// Controller is the controller that manages node ipam state. +type Controller struct { + allocateNodeCIDRs bool + allocatorType ipam.CIDRAllocatorType + + cloud cloudprovider.Interface + clusterCIDR *net.IPNet + serviceCIDR *net.IPNet + kubeClient clientset.Interface + // Method for easy mocking in unittest. + lookupIP func(host string) ([]net.IP, error) + + nodeLister corelisters.NodeLister + nodeInformerSynced cache.InformerSynced + + cidrAllocator ipam.CIDRAllocator + + forcefullyDeletePod func(*v1.Pod) error +} + +// NewNodeIpamController returns a new node IP Address Management controller to +// sync instances from cloudprovider. +// This method returns an error if it is unable to initialize the CIDR bitmap with +// podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes +// currently, this should be handled as a fatal error. +func NewNodeIpamController( + nodeInformer coreinformers.NodeInformer, + cloud cloudprovider.Interface, + kubeClient clientset.Interface, + clusterCIDR *net.IPNet, + serviceCIDR *net.IPNet, + nodeCIDRMaskSize int, + allocateNodeCIDRs bool, + allocatorType ipam.CIDRAllocatorType) (*Controller, error) { + + if kubeClient == nil { + glog.Fatalf("kubeClient is nil when starting Controller") + } + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + + glog.V(0).Infof("Sending events to api server.") + eventBroadcaster.StartRecordingToSink( + &v1core.EventSinkImpl{ + Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events(""), + }) + + if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("node_ipam_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) + } + + if allocateNodeCIDRs { + if clusterCIDR == nil { + glog.Fatal("Controller: Must specify clusterCIDR if allocateNodeCIDRs == true.") + } + mask := clusterCIDR.Mask + if maskSize, _ := mask.Size(); maskSize > nodeCIDRMaskSize { + glog.Fatal("Controller: Invalid clusterCIDR, mask size of clusterCIDR must be less than nodeCIDRMaskSize.") + } + } + + ic := &Controller{ + cloud: cloud, + kubeClient: kubeClient, + lookupIP: net.LookupIP, + clusterCIDR: clusterCIDR, + serviceCIDR: serviceCIDR, + allocateNodeCIDRs: allocateNodeCIDRs, + allocatorType: allocatorType, + } + + // TODO: Abstract this check into a generic controller manager should run method. + if ic.allocateNodeCIDRs { + if ic.allocatorType == ipam.IPAMFromClusterAllocatorType || ic.allocatorType == ipam.IPAMFromCloudAllocatorType { + cfg := &ipam.Config{ + Resync: ipamResyncInterval, + MaxBackoff: ipamMaxBackoff, + InitialRetry: ipamInitialBackoff, + } + switch ic.allocatorType { + case ipam.IPAMFromClusterAllocatorType: + cfg.Mode = nodesync.SyncFromCluster + case ipam.IPAMFromCloudAllocatorType: + cfg.Mode = nodesync.SyncFromCloud + } + ipamc, err := ipam.NewController(cfg, kubeClient, cloud, clusterCIDR, serviceCIDR, nodeCIDRMaskSize) + if err != nil { + glog.Fatalf("Error creating ipam controller: %v", err) + } + if err := ipamc.Start(nodeInformer); err != nil { + glog.Fatalf("Error trying to Init(): %v", err) + } + } else { + var err error + ic.cidrAllocator, err = ipam.New( + kubeClient, cloud, nodeInformer, ic.allocatorType, ic.clusterCIDR, ic.serviceCIDR, nodeCIDRMaskSize) + if err != nil { + return nil, err + } + } + } + + ic.nodeLister = nodeInformer.Lister() + ic.nodeInformerSynced = nodeInformer.Informer().HasSynced + + return ic, nil +} + +// Run starts an asynchronous loop that monitors the status of cluster nodes. +func (nc *Controller) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + glog.Infof("Starting ipam controller") + defer glog.Infof("Shutting down ipam controller") + + if !controller.WaitForCacheSync("node", stopCh, nc.nodeInformerSynced) { + return + } + + // TODO: Abstract this check into a generic controller manager should run method. + if nc.allocateNodeCIDRs { + if nc.allocatorType != ipam.IPAMFromClusterAllocatorType && nc.allocatorType != ipam.IPAMFromCloudAllocatorType { + go nc.cidrAllocator.Run(stopCh) + } + } + + <-stopCh +} diff --git a/pkg/controller/node/BUILD b/pkg/controller/nodelifecycle/BUILD similarity index 78% rename from pkg/controller/node/BUILD rename to pkg/controller/nodelifecycle/BUILD index 57a684c8fe..25d036e3cf 100644 --- a/pkg/controller/node/BUILD +++ b/pkg/controller/nodelifecycle/BUILD @@ -1,24 +1,76 @@ -package(default_visibility = ["//visibility:public"]) +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", +go_library( + name = "go_default_library", + srcs = [ + "metrics.go", + "node_lifecycle_controller.go", + ], + importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle", + visibility = ["//visibility:public"], + deps = [ + "//pkg/api/v1/node:go_default_library", + "//pkg/cloudprovider:go_default_library", + "//pkg/controller:go_default_library", + "//pkg/controller/nodelifecycle/scheduler:go_default_library", + "//pkg/controller/util/node:go_default_library", + "//pkg/util/metrics:go_default_library", + "//pkg/util/node:go_default_library", + "//pkg/util/system:go_default_library", + "//pkg/util/taints:go_default_library", + "//pkg/util/version:go_default_library", + "//plugin/pkg/scheduler/algorithm:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/informers/extensions/v1beta1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", + "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/listers/extensions/v1beta1:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/controller/nodelifecycle/scheduler:all-srcs", + ], + tags = ["automanaged"], + visibility = ["//visibility:public"], ) go_test( name = "go_default_test", - srcs = ["nodecontroller_test.go"], + srcs = ["node_lifecycle_controller_test.go"], embed = [":go_default_library"], - importpath = "k8s.io/kubernetes/pkg/controller/node", + importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle", deps = [ "//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider/providers/fake:go_default_library", "//pkg/controller:go_default_library", - "//pkg/controller/node/ipam:go_default_library", - "//pkg/controller/node/scheduler:go_default_library", - "//pkg/controller/node/util:go_default_library", + "//pkg/controller/nodelifecycle/scheduler:go_default_library", "//pkg/controller/testutil:go_default_library", + "//pkg/controller/util/node:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/util/node:go_default_library", "//pkg/util/taints:go_default_library", @@ -39,65 +91,3 @@ go_test( "//vendor/k8s.io/client-go/testing:go_default_library", ], ) - -go_library( - name = "go_default_library", - srcs = [ - "doc.go", - "metrics.go", - "node_controller.go", - ], - importpath = "k8s.io/kubernetes/pkg/controller/node", - deps = [ - "//pkg/api/v1/node:go_default_library", - "//pkg/cloudprovider:go_default_library", - "//pkg/controller:go_default_library", - "//pkg/controller/node/ipam:go_default_library", - "//pkg/controller/node/ipam/sync:go_default_library", - "//pkg/controller/node/scheduler:go_default_library", - "//pkg/controller/node/util:go_default_library", - "//pkg/util/metrics:go_default_library", - "//pkg/util/node:go_default_library", - "//pkg/util/system:go_default_library", - "//pkg/util/taints:go_default_library", - "//plugin/pkg/scheduler/algorithm:go_default_library", - "//vendor/github.com/golang/glog:go_default_library", - "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", - "//vendor/k8s.io/client-go/informers/extensions/v1beta1:go_default_library", - "//vendor/k8s.io/client-go/kubernetes:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", - "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", - "//vendor/k8s.io/client-go/listers/extensions/v1beta1:go_default_library", - "//vendor/k8s.io/client-go/tools/cache:go_default_library", - "//vendor/k8s.io/client-go/tools/record:go_default_library", - "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [ - ":package-srcs", - "//pkg/controller/node/ipam:all-srcs", - "//pkg/controller/node/scheduler:all-srcs", - "//pkg/controller/node/util:all-srcs", - ], - tags = ["automanaged"], -) diff --git a/pkg/controller/node/metrics.go b/pkg/controller/nodelifecycle/metrics.go similarity index 97% rename from pkg/controller/node/metrics.go rename to pkg/controller/nodelifecycle/metrics.go index 31bba5b233..ae61266c8a 100644 --- a/pkg/controller/node/metrics.go +++ b/pkg/controller/nodelifecycle/metrics.go @@ -1,5 +1,5 @@ /* -Copyright 2016 The Kubernetes Authors. +Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package nodelifecycle import ( "sync" diff --git a/pkg/controller/node/node_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go similarity index 84% rename from pkg/controller/node/node_controller.go rename to pkg/controller/nodelifecycle/node_lifecycle_controller.go index ab490d6a05..e2a47f2076 100644 --- a/pkg/controller/node/node_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,16 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +// The Controller sets tainted annotations on nodes. +// Tainted nodes should not be used for new work loads and +// some effort should be given to getting existing work +// loads off of tainted nodes. + +package nodelifecycle import ( - "fmt" - "net" - "sync" - "time" - - "github.com/golang/glog" - + "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,31 +30,31 @@ import ( "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - - "k8s.io/client-go/kubernetes/scheme" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/flowcontrol" - - "k8s.io/api/core/v1" coreinformers "k8s.io/client-go/informers/core/v1" extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" corelisters "k8s.io/client-go/listers/core/v1" extensionslisters "k8s.io/client-go/listers/extensions/v1beta1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/flowcontrol" v1node "k8s.io/kubernetes/pkg/api/v1/node" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/node/ipam" - nodesync "k8s.io/kubernetes/pkg/controller/node/ipam/sync" - "k8s.io/kubernetes/pkg/controller/node/scheduler" - "k8s.io/kubernetes/pkg/controller/node/util" + "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler" + nodeutil "k8s.io/kubernetes/pkg/controller/util/node" "k8s.io/kubernetes/pkg/util/metrics" utilnode "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/system" taintutils "k8s.io/kubernetes/pkg/util/taints" + utilversion "k8s.io/kubernetes/pkg/util/version" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + + "fmt" + "github.com/golang/glog" + "sync" + "time" ) func init() { @@ -64,11 +63,14 @@ func init() { } var ( + gracefulDeletionVersion = utilversion.MustParseSemantic("v1.1.0") + // UnreachableTaintTemplate is the taint for when a node becomes unreachable. UnreachableTaintTemplate = &v1.Taint{ Key: algorithm.TaintNodeUnreachable, Effect: v1.TaintEffectNoExecute, } + // NotReadyTaintTemplate is the taint for when a node is not ready for // executing pods NotReadyTaintTemplate = &v1.Taint{ @@ -91,23 +93,6 @@ var ( } ) -const ( - // The amount of time the nodecontroller polls on the list nodes endpoint. - apiserverStartupGracePeriod = 10 * time.Minute - // The amount of time the nodecontroller should sleep between retrying NodeStatus updates - retrySleepTime = 20 * time.Millisecond - - // ipamResyncInterval is the amount of time between when the cloud and node - // CIDR range assignments are synchronized. - ipamResyncInterval = 30 * time.Second - // ipamMaxBackoff is the maximum backoff for retrying synchronization of a - // given in the error state. - ipamMaxBackoff = 10 * time.Second - // ipamInitialRetry is the initial retry interval for retrying synchronization of a - // given in the error state. - ipamInitialBackoff = 250 * time.Millisecond -) - // ZoneState is the state of a given zone. type ZoneState string @@ -118,24 +103,68 @@ const ( statePartialDisruption = ZoneState("PartialDisruption") ) +const ( + // The amount of time the nodecontroller polls on the list nodes endpoint. + apiserverStartupGracePeriod = 10 * time.Minute + // The amount of time the nodecontroller should sleep between retrying NodeStatus updates + retrySleepTime = 20 * time.Millisecond +) + type nodeStatusData struct { probeTimestamp metav1.Time readyTransitionTimestamp metav1.Time status v1.NodeStatus } -// Controller is the controller that manages node related cluster state. +// Controller is the controller that manages node's life cycle. type Controller struct { - allocateNodeCIDRs bool - allocatorType ipam.CIDRAllocatorType + taintManager *scheduler.NoExecuteTaintManager + + podInformerSynced cache.InformerSynced + cloud cloudprovider.Interface + kubeClient clientset.Interface + + // This timestamp is to be used instead of LastProbeTime stored in Condition. We do this + // to aviod the problem with time skew across the cluster. + now func() metav1.Time + + enterPartialDisruptionFunc func(nodeNum int) float32 + enterFullDisruptionFunc func(nodeNum int) float32 + computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, ZoneState) - cloud cloudprovider.Interface - clusterCIDR *net.IPNet - serviceCIDR *net.IPNet knownNodeSet map[string]*v1.Node - kubeClient clientset.Interface - // Method for easy mocking in unittest. - lookupIP func(host string) ([]net.IP, error) + // per Node map storing last observed Status together with a local time when it was observed. + nodeStatusMap map[string]nodeStatusData + + // Lock to access evictor workers + evictorLock sync.Mutex + + // workers that evicts pods from unresponsive nodes. + zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue + + // workers that are responsible for tainting nodes. + zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue + + zoneStates map[string]ZoneState + + daemonSetStore extensionslisters.DaemonSetLister + daemonSetInformerSynced cache.InformerSynced + + nodeLister corelisters.NodeLister + nodeInformerSynced cache.InformerSynced + nodeExistsInCloudProvider func(types.NodeName) (bool, error) + + recorder record.EventRecorder + + // Value controlling Controller monitoring period, i.e. how often does Controller + // check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod. + // TODO: Change node status monitor to watch based. + nodeMonitorPeriod time.Duration + + // Value used if sync_nodes_status=False, only for node startup. When node + // is just created, e.g. cluster bootstrap or node creation, we give a longer grace period. + nodeStartupGracePeriod time.Duration + // Value used if sync_nodes_status=False. Controller will not proactively // sync node status in this case, but will monitor node status updated from kubelet. If // it doesn't receive update for this amount of time, it will start posting "NodeReady== @@ -151,45 +180,8 @@ type Controller struct { // 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes // longer for user to see up-to-date node status. nodeMonitorGracePeriod time.Duration - // Value controlling Controller monitoring period, i.e. how often does Controller - // check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod. - // TODO: Change node status monitor to watch based. - nodeMonitorPeriod time.Duration - // Value used if sync_nodes_status=False, only for node startup. When node - // is just created, e.g. cluster bootstrap or node creation, we give a longer grace period. - nodeStartupGracePeriod time.Duration - // per Node map storing last observed Status together with a local time when it was observed. - nodeStatusMap map[string]nodeStatusData - // This timestamp is to be used instead of LastProbeTime stored in Condition. We do this - // to aviod the problem with time skew across the cluster. - now func() metav1.Time - // Lock to access evictor workers - evictorLock sync.Mutex - // workers that evicts pods from unresponsive nodes. - zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue - // workers that are responsible for tainting nodes. - zoneNoExecuteTainer map[string]*scheduler.RateLimitedTimedQueue - podEvictionTimeout time.Duration - // The maximum duration before a pod evicted from a node can be forcefully terminated. - maximumGracePeriod time.Duration - recorder record.EventRecorder - nodeLister corelisters.NodeLister - nodeInformerSynced cache.InformerSynced - - daemonSetStore extensionslisters.DaemonSetLister - daemonSetInformerSynced cache.InformerSynced - - podInformerSynced cache.InformerSynced - cidrAllocator ipam.CIDRAllocator - taintManager *scheduler.NoExecuteTaintManager - - nodeExistsInCloudProvider func(types.NodeName) (bool, error) - computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, ZoneState) - enterPartialDisruptionFunc func(nodeNum int) float32 - enterFullDisruptionFunc func(nodeNum int) float32 - - zoneStates map[string]ZoneState + podEvictionTimeout time.Duration evictionLimiterQPS float32 secondaryEvictionLimiterQPS float32 largeClusterThreshold int32 @@ -208,29 +200,20 @@ type Controller struct { taintNodeByCondition bool } -// NewNodeController returns a new node controller to sync instances from cloudprovider. -// This method returns an error if it is unable to initialize the CIDR bitmap with -// podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes -// currently, this should be handled as a fatal error. -func NewNodeController( - podInformer coreinformers.PodInformer, +// NewNodeLifecycleController returns a new taint controller. +func NewNodeLifecycleController(podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, daemonSetInformer extensionsinformers.DaemonSetInformer, cloud cloudprovider.Interface, kubeClient clientset.Interface, + nodeMonitorPeriod time.Duration, + nodeStartupGracePeriod time.Duration, + nodeMonitorGracePeriod time.Duration, podEvictionTimeout time.Duration, evictionLimiterQPS float32, secondaryEvictionLimiterQPS float32, largeClusterThreshold int32, unhealthyZoneThreshold float32, - nodeMonitorGracePeriod time.Duration, - nodeStartupGracePeriod time.Duration, - nodeMonitorPeriod time.Duration, - clusterCIDR *net.IPNet, - serviceCIDR *net.IPNet, - nodeCIDRMaskSize int, - allocateNodeCIDRs bool, - allocatorType ipam.CIDRAllocatorType, runTaintManager bool, useTaintBasedEvictions bool, taintNodeByCondition bool) (*Controller, error) { @@ -241,55 +224,32 @@ func NewNodeController( eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "node-controller"}) - eventBroadcaster.StartLogging(glog.Infof) - - glog.V(0).Infof("Sending events to api server.") - eventBroadcaster.StartRecordingToSink( - &v1core.EventSinkImpl{ - Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events(""), - }) if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { - metrics.RegisterMetricAndTrackRateLimiterUsage("node_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) - } - - if allocateNodeCIDRs { - if clusterCIDR == nil { - glog.Fatal("Controller: Must specify clusterCIDR if allocateNodeCIDRs == true.") - } - mask := clusterCIDR.Mask - if maskSize, _ := mask.Size(); maskSize > nodeCIDRMaskSize { - glog.Fatalf("Controller: Invalid clusterCIDR, mask size of clusterCIDR(%d) must be less than nodeCIDRMaskSize(%d).", maskSize, nodeCIDRMaskSize) - } + metrics.RegisterMetricAndTrackRateLimiterUsage("node_lifecycle_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) } nc := &Controller{ - cloud: cloud, - knownNodeSet: make(map[string]*v1.Node), - kubeClient: kubeClient, - recorder: recorder, - podEvictionTimeout: podEvictionTimeout, - maximumGracePeriod: 5 * time.Minute, - zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue), - zoneNoExecuteTainer: make(map[string]*scheduler.RateLimitedTimedQueue), - nodeStatusMap: make(map[string]nodeStatusData), - nodeMonitorGracePeriod: nodeMonitorGracePeriod, - nodeMonitorPeriod: nodeMonitorPeriod, - nodeStartupGracePeriod: nodeStartupGracePeriod, - lookupIP: net.LookupIP, - now: metav1.Now, - clusterCIDR: clusterCIDR, - serviceCIDR: serviceCIDR, - allocateNodeCIDRs: allocateNodeCIDRs, - allocatorType: allocatorType, + cloud: cloud, + kubeClient: kubeClient, + now: metav1.Now, + knownNodeSet: make(map[string]*v1.Node), + nodeStatusMap: make(map[string]nodeStatusData), nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) { - return util.NodeExistsInCloudProvider(cloud, nodeName) + return nodeutil.ExistsInCloudProvider(cloud, nodeName) }, + recorder: recorder, + nodeMonitorPeriod: nodeMonitorPeriod, + nodeStartupGracePeriod: nodeStartupGracePeriod, + nodeMonitorGracePeriod: nodeMonitorGracePeriod, + zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue), + zoneNoExecuteTainter: make(map[string]*scheduler.RateLimitedTimedQueue), + zoneStates: make(map[string]ZoneState), + podEvictionTimeout: podEvictionTimeout, evictionLimiterQPS: evictionLimiterQPS, secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS, largeClusterThreshold: largeClusterThreshold, unhealthyZoneThreshold: unhealthyZoneThreshold, - zoneStates: make(map[string]ZoneState), runTaintManager: runTaintManager, useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager, taintNodeByCondition: taintNodeByCondition, @@ -297,6 +257,7 @@ func NewNodeController( if useTaintBasedEvictions { glog.Infof("Controller is using taint based evictions.") } + nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc nc.enterFullDisruptionFunc = nc.HealthyQPSFunc nc.computeZoneStateFunc = nc.ComputeZoneState @@ -337,48 +298,18 @@ func NewNodeController( }) nc.podInformerSynced = podInformer.Informer().HasSynced - if nc.allocateNodeCIDRs { - if nc.allocatorType == ipam.IPAMFromClusterAllocatorType || nc.allocatorType == ipam.IPAMFromCloudAllocatorType { - cfg := &ipam.Config{ - Resync: ipamResyncInterval, - MaxBackoff: ipamMaxBackoff, - InitialRetry: ipamInitialBackoff, - } - switch nc.allocatorType { - case ipam.IPAMFromClusterAllocatorType: - cfg.Mode = nodesync.SyncFromCluster - case ipam.IPAMFromCloudAllocatorType: - cfg.Mode = nodesync.SyncFromCloud - } - ipamc, err := ipam.NewController(cfg, kubeClient, cloud, clusterCIDR, serviceCIDR, nodeCIDRMaskSize) - if err != nil { - glog.Fatalf("Error creating ipam controller: %v", err) - } - if err := ipamc.Start(nodeInformer); err != nil { - glog.Fatalf("Error trying to Init(): %v", err) - } - } else { - var err error - nc.cidrAllocator, err = ipam.New( - kubeClient, cloud, nodeInformer, nc.allocatorType, nc.clusterCIDR, nc.serviceCIDR, nodeCIDRMaskSize) - if err != nil { - return nil, err - } - } - } - if nc.runTaintManager { nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient) nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error { + AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { nc.taintManager.NodeUpdated(nil, node) return nil }), - UpdateFunc: util.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error { + UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error { nc.taintManager.NodeUpdated(oldNode, newNode) return nil }), - DeleteFunc: util.CreateDeleteNodeHandler(func(node *v1.Node) error { + DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error { nc.taintManager.NodeUpdated(node, nil) return nil }), @@ -388,10 +319,10 @@ func NewNodeController( if nc.taintNodeByCondition { glog.Infof("Controller will taint node by condition.") nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error { + AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { return nc.doNoScheduleTaintingPass(node) }), - UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { + UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { return nc.doNoScheduleTaintingPass(newNode) }), }) @@ -400,10 +331,10 @@ func NewNodeController( // NOTE(resouer): nodeInformer to substitute deprecated taint key (notReady -> not-ready). // Remove this logic when we don't need this backwards compatibility nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error { + AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { return nc.doFixDeprecatedTaintKeyPass(node) }), - UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { + UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { return nc.doFixDeprecatedTaintKeyPass(newNode) }), }) @@ -417,33 +348,40 @@ func NewNodeController( return nc, nil } -func (nc *Controller) doEvictionPass() { - nc.evictorLock.Lock() - defer nc.evictorLock.Unlock() - for k := range nc.zonePodEvictor { - // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded). - nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) { - node, err := nc.nodeLister.Get(value.Value) - if apierrors.IsNotFound(err) { - glog.Warningf("Node %v no longer present in nodeLister!", value.Value) - } else if err != nil { - glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err) - } else { - zone := utilnode.GetZoneKey(node) - evictionsNumber.WithLabelValues(zone).Inc() - } - nodeUID, _ := value.UID.(string) - remaining, err := util.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore) - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) - return false, 0 - } - if remaining { - glog.Infof("Pods awaiting deletion due to Controller eviction") - } - return true, 0 - }) +// Run starts an asynchronous loop that monitors the status of cluster nodes. +func (nc *Controller) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + glog.Infof("Starting node controller") + defer glog.Infof("Shutting down node controller") + + if !controller.WaitForCacheSync("taint", stopCh, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) { + return } + + if nc.runTaintManager { + go nc.taintManager.Run(wait.NeverStop) + } + + if nc.useTaintBasedEvictions { + // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated + // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints. + go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, wait.NeverStop) + } else { + // Managing eviction of nodes: + // When we delete pods off a node, if the node was not empty at the time we then + // queue an eviction watcher. If we hit an error, retry deletion. + go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop) + } + + // Incorporate the results of node status pushed from kubelet to master. + go wait.Until(func() { + if err := nc.monitorNodeStatus(); err != nil { + glog.Errorf("Error monitoring node status: %v", err) + } + }, nc.nodeMonitorPeriod, wait.NeverStop) + + <-stopCh } // doFixDeprecatedTaintKeyPass checks and replaces deprecated taint key with proper key name if needed. @@ -478,7 +416,7 @@ func (nc *Controller) doFixDeprecatedTaintKeyPass(node *v1.Node) error { glog.Warningf("Detected deprecated taint keys: %v on node: %v, will substitute them with %v", taintsToDel, node.GetName(), taintsToAdd) - if !util.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) { + if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) { return fmt.Errorf("failed to swap taints of node %+v", node) } return nil @@ -506,7 +444,7 @@ func (nc *Controller) doNoScheduleTaintingPass(node *v1.Node) error { if len(taintsToAdd) == 0 && len(taintsToDel) == 0 { return nil } - if !util.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) { + if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) { return fmt.Errorf("failed to swap taints of node %+v", node) } return nil @@ -515,9 +453,9 @@ func (nc *Controller) doNoScheduleTaintingPass(node *v1.Node) error { func (nc *Controller) doNoExecuteTaintingPass() { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() - for k := range nc.zoneNoExecuteTainer { + for k := range nc.zoneNoExecuteTainter { // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded). - nc.zoneNoExecuteTainer[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) { + nc.zoneNoExecuteTainter[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) { node, err := nc.nodeLister.Get(value.Value) if apierrors.IsNotFound(err) { glog.Warningf("Node %v no longer present in nodeLister!", value.Value) @@ -546,70 +484,37 @@ func (nc *Controller) doNoExecuteTaintingPass() { return true, 0 } - return util.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node), 0 + return nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node), 0 }) } } -// Run starts an asynchronous loop that monitors the status of cluster nodes. -func (nc *Controller) Run(stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() - - glog.Infof("Starting node controller") - defer glog.Infof("Shutting down node controller") - - if !controller.WaitForCacheSync("node", stopCh, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) { - return - } - - // Incorporate the results of node status pushed from kubelet to master. - go wait.Until(func() { - if err := nc.monitorNodeStatus(); err != nil { - glog.Errorf("Error monitoring node status: %v", err) - } - }, nc.nodeMonitorPeriod, wait.NeverStop) - - if nc.runTaintManager { - go nc.taintManager.Run(wait.NeverStop) - } - - if nc.useTaintBasedEvictions { - // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated - // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints. - go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, wait.NeverStop) - } else { - // Managing eviction of nodes: - // When we delete pods off a node, if the node was not empty at the time we then - // queue an eviction watcher. If we hit an error, retry deletion. - go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop) - } - - if nc.allocateNodeCIDRs { - if nc.allocatorType != ipam.IPAMFromClusterAllocatorType && nc.allocatorType != ipam.IPAMFromCloudAllocatorType { - go nc.cidrAllocator.Run(wait.NeverStop) - } - } - - <-stopCh -} - -// addPodEvictorForNewZone checks if new zone appeared, and if so add new evictor. -func (nc *Controller) addPodEvictorForNewZone(node *v1.Node) { - zone := utilnode.GetZoneKey(node) - if _, found := nc.zoneStates[zone]; !found { - nc.zoneStates[zone] = stateInitial - if !nc.useTaintBasedEvictions { - nc.zonePodEvictor[zone] = - scheduler.NewRateLimitedTimedQueue( - flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst)) - } else { - nc.zoneNoExecuteTainer[zone] = - scheduler.NewRateLimitedTimedQueue( - flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst)) - } - // Init the metric for the new zone. - glog.Infof("Initializing eviction metric for zone: %v", zone) - evictionsNumber.WithLabelValues(zone).Add(0) +func (nc *Controller) doEvictionPass() { + nc.evictorLock.Lock() + defer nc.evictorLock.Unlock() + for k := range nc.zonePodEvictor { + // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded). + nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) { + node, err := nc.nodeLister.Get(value.Value) + if apierrors.IsNotFound(err) { + glog.Warningf("Node %v no longer present in nodeLister!", value.Value) + } else if err != nil { + glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err) + } else { + zone := utilnode.GetZoneKey(node) + evictionsNumber.WithLabelValues(zone).Inc() + } + nodeUID, _ := value.UID.(string) + remaining, err := nodeutil.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) + return false, 0 + } + if remaining { + glog.Infof("Pods awaiting deletion due to Controller eviction") + } + return true, 0 + }) } } @@ -631,7 +536,7 @@ func (nc *Controller) monitorNodeStatus() error { for i := range added { glog.V(1).Infof("Controller observed a new Node: %#v", added[i].Name) - util.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name)) + nodeutil.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name)) nc.knownNodeSet[added[i].Name] = added[i] nc.addPodEvictorForNewZone(added[i]) if nc.useTaintBasedEvictions { @@ -643,7 +548,7 @@ func (nc *Controller) monitorNodeStatus() error { for i := range deleted { glog.V(1).Infof("Controller observed a Node deletion: %v", deleted[i].Name) - util.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name)) + nodeutil.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name)) delete(nc.knownNodeSet, deleted[i].Name) } @@ -684,7 +589,7 @@ func (nc *Controller) monitorNodeStatus() error { // We want to update the taint straight away if Node is already tainted with the UnreachableTaint if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) { taintToAdd := *NotReadyTaintTemplate - if !util.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) { + if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) { glog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.") } } else if nc.markNodeForTainting(node) { @@ -711,7 +616,7 @@ func (nc *Controller) monitorNodeStatus() error { // We want to update the taint straight away if Node is already tainted with the UnreachableTaint if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) { taintToAdd := *UnreachableTaintTemplate - if !util.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) { + if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) { glog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.") } } else if nc.markNodeForTainting(node) { @@ -751,8 +656,8 @@ func (nc *Controller) monitorNodeStatus() error { // Report node event. if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue { - util.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady") - if err = util.MarkAllPodsNotReady(nc.kubeClient, node); err != nil { + nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady") + if err = nodeutil.MarkAllPodsNotReady(nc.kubeClient, node); err != nil { utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err)) } } @@ -767,13 +672,13 @@ func (nc *Controller) monitorNodeStatus() error { } if !exists { glog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name) - util.RecordNodeEvent(nc.recorder, node.Name, string(node.UID), v1.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name)) + nodeutil.RecordNodeEvent(nc.recorder, node.Name, string(node.UID), v1.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name)) go func(nodeName string) { defer utilruntime.HandleCrash() // Kubelet is not reporting and Cloud Provider says node // is gone. Delete it without worrying about grace // periods. - if err := util.ForcefullyDeleteNode(nc.kubeClient, nodeName); err != nil { + if err := nodeutil.ForcefullyDeleteNode(nc.kubeClient, nodeName); err != nil { glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err) } }(node.Name) @@ -786,131 +691,6 @@ func (nc *Controller) monitorNodeStatus() error { return nil } -func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) { - newZoneStates := map[string]ZoneState{} - allAreFullyDisrupted := true - for k, v := range zoneToNodeConditions { - zoneSize.WithLabelValues(k).Set(float64(len(v))) - unhealthy, newState := nc.computeZoneStateFunc(v) - zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v))) - unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy)) - if newState != stateFullDisruption { - allAreFullyDisrupted = false - } - newZoneStates[k] = newState - if _, had := nc.zoneStates[k]; !had { - glog.Errorf("Setting initial state for unseen zone: %v", k) - nc.zoneStates[k] = stateInitial - } - } - - allWasFullyDisrupted := true - for k, v := range nc.zoneStates { - if _, have := zoneToNodeConditions[k]; !have { - zoneSize.WithLabelValues(k).Set(0) - zoneHealth.WithLabelValues(k).Set(100) - unhealthyNodes.WithLabelValues(k).Set(0) - delete(nc.zoneStates, k) - continue - } - if v != stateFullDisruption { - allWasFullyDisrupted = false - break - } - } - - // At least one node was responding in previous pass or in the current pass. Semantics is as follows: - // - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use, - // - if the new state is "normal" we resume normal operation (go back to default limiter settings), - // - if new state is "fullDisruption" we restore normal eviction rate, - // - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions. - if !allAreFullyDisrupted || !allWasFullyDisrupted { - // We're switching to full disruption mode - if allAreFullyDisrupted { - glog.V(0).Info("Controller detected that all Nodes are not-Ready. Entering master disruption mode.") - for i := range nodes { - if nc.useTaintBasedEvictions { - _, err := nc.markNodeAsReachable(nodes[i]) - if err != nil { - glog.Errorf("Failed to remove taints from Node %v", nodes[i].Name) - } - } else { - nc.cancelPodEviction(nodes[i]) - } - } - // We stop all evictions. - for k := range nc.zoneStates { - if nc.useTaintBasedEvictions { - nc.zoneNoExecuteTainer[k].SwapLimiter(0) - } else { - nc.zonePodEvictor[k].SwapLimiter(0) - } - } - for k := range nc.zoneStates { - nc.zoneStates[k] = stateFullDisruption - } - // All rate limiters are updated, so we can return early here. - return - } - // We're exiting full disruption mode - if allWasFullyDisrupted { - glog.V(0).Info("Controller detected that some Nodes are Ready. Exiting master disruption mode.") - // When exiting disruption mode update probe timestamps on all Nodes. - now := nc.now() - for i := range nodes { - v := nc.nodeStatusMap[nodes[i].Name] - v.probeTimestamp = now - v.readyTransitionTimestamp = now - nc.nodeStatusMap[nodes[i].Name] = v - } - // We reset all rate limiters to settings appropriate for the given state. - for k := range nc.zoneStates { - nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k]) - nc.zoneStates[k] = newZoneStates[k] - } - return - } - // We know that there's at least one not-fully disrupted so, - // we can use default behavior for rate limiters - for k, v := range nc.zoneStates { - newState := newZoneStates[k] - if v == newState { - continue - } - glog.V(0).Infof("Controller detected that zone %v is now in state %v.", k, newState) - nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState) - nc.zoneStates[k] = newState - } - } -} - -func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) { - switch state { - case stateNormal: - if nc.useTaintBasedEvictions { - nc.zoneNoExecuteTainer[zone].SwapLimiter(nc.evictionLimiterQPS) - } else { - nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS) - } - case statePartialDisruption: - if nc.useTaintBasedEvictions { - nc.zoneNoExecuteTainer[zone].SwapLimiter( - nc.enterPartialDisruptionFunc(zoneSize)) - } else { - nc.zonePodEvictor[zone].SwapLimiter( - nc.enterPartialDisruptionFunc(zoneSize)) - } - case stateFullDisruption: - if nc.useTaintBasedEvictions { - nc.zoneNoExecuteTainer[zone].SwapLimiter( - nc.enterFullDisruptionFunc(zoneSize)) - } else { - nc.zonePodEvictor[zone].SwapLimiter( - nc.enterFullDisruptionFunc(zoneSize)) - } - } -} - // tryUpdateNodeStatus checks a given node's conditions and tries to update it. Returns grace period to // which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred. func (nc *Controller) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) { @@ -1082,6 +862,131 @@ func (nc *Controller) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.Node return gracePeriod, observedReadyCondition, currentReadyCondition, err } +func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) { + newZoneStates := map[string]ZoneState{} + allAreFullyDisrupted := true + for k, v := range zoneToNodeConditions { + zoneSize.WithLabelValues(k).Set(float64(len(v))) + unhealthy, newState := nc.computeZoneStateFunc(v) + zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v))) + unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy)) + if newState != stateFullDisruption { + allAreFullyDisrupted = false + } + newZoneStates[k] = newState + if _, had := nc.zoneStates[k]; !had { + glog.Errorf("Setting initial state for unseen zone: %v", k) + nc.zoneStates[k] = stateInitial + } + } + + allWasFullyDisrupted := true + for k, v := range nc.zoneStates { + if _, have := zoneToNodeConditions[k]; !have { + zoneSize.WithLabelValues(k).Set(0) + zoneHealth.WithLabelValues(k).Set(100) + unhealthyNodes.WithLabelValues(k).Set(0) + delete(nc.zoneStates, k) + continue + } + if v != stateFullDisruption { + allWasFullyDisrupted = false + break + } + } + + // At least one node was responding in previous pass or in the current pass. Semantics is as follows: + // - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use, + // - if the new state is "normal" we resume normal operation (go back to default limiter settings), + // - if new state is "fullDisruption" we restore normal eviction rate, + // - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions. + if !allAreFullyDisrupted || !allWasFullyDisrupted { + // We're switching to full disruption mode + if allAreFullyDisrupted { + glog.V(0).Info("Controller detected that all Nodes are not-Ready. Entering master disruption mode.") + for i := range nodes { + if nc.useTaintBasedEvictions { + _, err := nc.markNodeAsReachable(nodes[i]) + if err != nil { + glog.Errorf("Failed to remove taints from Node %v", nodes[i].Name) + } + } else { + nc.cancelPodEviction(nodes[i]) + } + } + // We stop all evictions. + for k := range nc.zoneStates { + if nc.useTaintBasedEvictions { + nc.zoneNoExecuteTainter[k].SwapLimiter(0) + } else { + nc.zonePodEvictor[k].SwapLimiter(0) + } + } + for k := range nc.zoneStates { + nc.zoneStates[k] = stateFullDisruption + } + // All rate limiters are updated, so we can return early here. + return + } + // We're exiting full disruption mode + if allWasFullyDisrupted { + glog.V(0).Info("Controller detected that some Nodes are Ready. Exiting master disruption mode.") + // When exiting disruption mode update probe timestamps on all Nodes. + now := nc.now() + for i := range nodes { + v := nc.nodeStatusMap[nodes[i].Name] + v.probeTimestamp = now + v.readyTransitionTimestamp = now + nc.nodeStatusMap[nodes[i].Name] = v + } + // We reset all rate limiters to settings appropriate for the given state. + for k := range nc.zoneStates { + nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k]) + nc.zoneStates[k] = newZoneStates[k] + } + return + } + // We know that there's at least one not-fully disrupted so, + // we can use default behavior for rate limiters + for k, v := range nc.zoneStates { + newState := newZoneStates[k] + if v == newState { + continue + } + glog.V(0).Infof("Controller detected that zone %v is now in state %v.", k, newState) + nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState) + nc.zoneStates[k] = newState + } + } +} + +func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) { + switch state { + case stateNormal: + if nc.useTaintBasedEvictions { + nc.zoneNoExecuteTainter[zone].SwapLimiter(nc.evictionLimiterQPS) + } else { + nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS) + } + case statePartialDisruption: + if nc.useTaintBasedEvictions { + nc.zoneNoExecuteTainter[zone].SwapLimiter( + nc.enterPartialDisruptionFunc(zoneSize)) + } else { + nc.zonePodEvictor[zone].SwapLimiter( + nc.enterPartialDisruptionFunc(zoneSize)) + } + case stateFullDisruption: + if nc.useTaintBasedEvictions { + nc.zoneNoExecuteTainter[zone].SwapLimiter( + nc.enterFullDisruptionFunc(zoneSize)) + } else { + nc.zonePodEvictor[zone].SwapLimiter( + nc.enterFullDisruptionFunc(zoneSize)) + } + } +} + // classifyNodes classifies the allNodes to three categories: // 1. added: the nodes that in 'allNodes', but not in 'knownNodeSet' // 2. deleted: the nodes that in 'knownNodeSet', but not in 'allNodes' @@ -1116,6 +1021,41 @@ func (nc *Controller) classifyNodes(allNodes []*v1.Node) (added, deleted, newZon return } +// HealthyQPSFunc returns the default value for cluster eviction rate - we take +// nodeNum for consistency with ReducedQPSFunc. +func (nc *Controller) HealthyQPSFunc(nodeNum int) float32 { + return nc.evictionLimiterQPS +} + +// ReducedQPSFunc returns the QPS for when a the cluster is large make +// evictions slower, if they're small stop evictions altogether. +func (nc *Controller) ReducedQPSFunc(nodeNum int) float32 { + if int32(nodeNum) > nc.largeClusterThreshold { + return nc.secondaryEvictionLimiterQPS + } + return 0 +} + +// addPodEvictorForNewZone checks if new zone appeared, and if so add new evictor. +func (nc *Controller) addPodEvictorForNewZone(node *v1.Node) { + zone := utilnode.GetZoneKey(node) + if _, found := nc.zoneStates[zone]; !found { + nc.zoneStates[zone] = stateInitial + if !nc.useTaintBasedEvictions { + nc.zonePodEvictor[zone] = + scheduler.NewRateLimitedTimedQueue( + flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst)) + } else { + nc.zoneNoExecuteTainter[zone] = + scheduler.NewRateLimitedTimedQueue( + flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst)) + } + // Init the metric for the new zone. + glog.Infof("Initializing eviction metric for zone: %v", zone) + evictionsNumber.WithLabelValues(zone).Add(0) + } +} + // cancelPodEviction removes any queued evictions, typically because the node is available again. It // returns true if an eviction was queued. func (nc *Controller) cancelPodEviction(node *v1.Node) bool { @@ -1141,7 +1081,7 @@ func (nc *Controller) evictPods(node *v1.Node) bool { func (nc *Controller) markNodeForTainting(node *v1.Node) bool { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() - return nc.zoneNoExecuteTainer[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID)) + return nc.zoneNoExecuteTainter[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID)) } func (nc *Controller) markNodeAsReachable(node *v1.Node) (bool, error) { @@ -1157,22 +1097,7 @@ func (nc *Controller) markNodeAsReachable(node *v1.Node) (bool, error) { glog.Errorf("Failed to remove taint from node %v: %v", node.Name, err) return false, err } - return nc.zoneNoExecuteTainer[utilnode.GetZoneKey(node)].Remove(node.Name), nil -} - -// HealthyQPSFunc returns the default value for cluster eviction rate - we take -// nodeNum for consistency with ReducedQPSFunc. -func (nc *Controller) HealthyQPSFunc(nodeNum int) float32 { - return nc.evictionLimiterQPS -} - -// ReducedQPSFunc returns the QPS for when a the cluster is large make -// evictions slower, if they're small stop evictions altogether. -func (nc *Controller) ReducedQPSFunc(nodeNum int) float32 { - if int32(nodeNum) > nc.largeClusterThreshold { - return nc.secondaryEvictionLimiterQPS - } - return 0 + return nc.zoneNoExecuteTainter[utilnode.GetZoneKey(node)].Remove(node.Name), nil } // ComputeZoneState returns a slice of NodeReadyConditions for all Nodes in a given zone. diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go similarity index 93% rename from pkg/controller/node/nodecontroller_test.go rename to pkg/controller/nodelifecycle/node_lifecycle_controller_test.go index a871dcbc6e..93affbb9d6 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,10 +14,9 @@ See the License for the specific language governing permissions and limitations under the License. */ -package node +package nodelifecycle import ( - "net" "strings" "testing" "time" @@ -39,10 +38,9 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/node/ipam" - "k8s.io/kubernetes/pkg/controller/node/scheduler" - "k8s.io/kubernetes/pkg/controller/node/util" + "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler" "k8s.io/kubernetes/pkg/controller/testutil" + nodeutil "k8s.io/kubernetes/pkg/controller/util/node" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/pkg/util/node" taintutils "k8s.io/kubernetes/pkg/util/taints" @@ -60,13 +58,46 @@ const ( func alwaysReady() bool { return true } -type nodeController struct { +type nodeLifecycleController struct { *Controller nodeInformer coreinformers.NodeInformer daemonSetInformer extensionsinformers.DaemonSetInformer } -func newNodeControllerFromClient( +// doEviction does the fake eviction and returns the status of eviction operation. +func (nc *nodeLifecycleController) doEviction(fakeNodeHandler *testutil.FakeNodeHandler) bool { + var podEvicted bool + zones := testutil.GetZones(fakeNodeHandler) + for _, zone := range zones { + nc.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { + uid, _ := value.UID.(string) + nodeutil.DeletePods(fakeNodeHandler, nc.recorder, value.Value, uid, nc.daemonSetStore) + return true, 0 + }) + } + + for _, action := range fakeNodeHandler.Actions() { + if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { + podEvicted = true + return podEvicted + } + } + return podEvicted +} + +func (nc *nodeLifecycleController) syncNodeStore(fakeNodeHandler *testutil.FakeNodeHandler) error { + nodes, err := fakeNodeHandler.List(metav1.ListOptions{}) + if err != nil { + return err + } + newElems := make([]interface{}, 0, len(nodes.Items)) + for i := range nodes.Items { + newElems = append(newElems, &nodes.Items[i]) + } + return nc.nodeInformer.Informer().GetStore().Replace(newElems, "newRV") +} + +func newNodeLifecycleControllerFromClient( cloud cloudprovider.Interface, kubeClient clientset.Interface, podEvictionTimeout time.Duration, @@ -77,37 +108,28 @@ func newNodeControllerFromClient( nodeMonitorGracePeriod time.Duration, nodeStartupGracePeriod time.Duration, nodeMonitorPeriod time.Duration, - clusterCIDR *net.IPNet, - serviceCIDR *net.IPNet, - nodeCIDRMaskSize int, - allocateNodeCIDRs bool, useTaints bool, -) (*nodeController, error) { +) (*nodeLifecycleController, error) { factory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) nodeInformer := factory.Core().V1().Nodes() daemonSetInformer := factory.Extensions().V1beta1().DaemonSets() - nc, err := NewNodeController( + nc, err := NewNodeLifecycleController( factory.Core().V1().Pods(), nodeInformer, daemonSetInformer, cloud, kubeClient, + nodeMonitorPeriod, + nodeStartupGracePeriod, + nodeMonitorGracePeriod, podEvictionTimeout, evictionLimiterQPS, secondaryEvictionLimiterQPS, largeClusterThreshold, unhealthyZoneThreshold, - nodeMonitorGracePeriod, - nodeStartupGracePeriod, - nodeMonitorPeriod, - clusterCIDR, - serviceCIDR, - nodeCIDRMaskSize, - allocateNodeCIDRs, - ipam.RangeAllocatorType, useTaints, useTaints, useTaints, @@ -120,19 +142,7 @@ func newNodeControllerFromClient( nc.nodeInformerSynced = alwaysReady nc.daemonSetInformerSynced = alwaysReady - return &nodeController{nc, nodeInformer, daemonSetInformer}, nil -} - -func syncNodeStore(nc *nodeController, fakeNodeHandler *testutil.FakeNodeHandler) error { - nodes, err := fakeNodeHandler.List(metav1.ListOptions{}) - if err != nil { - return err - } - newElems := make([]interface{}, 0, len(nodes.Items)) - for i := range nodes.Items { - newElems = append(newElems, &nodes.Items[i]) - } - return nc.nodeInformer.Informer().GetStore().Replace(newElems, "newRV") + return &nodeLifecycleController{nc, nodeInformer, daemonSetInformer}, nil } func TestMonitorNodeStatusEvictPods(t *testing.T) { @@ -597,7 +607,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { } for _, item := range table { - nodeController, _ := newNodeControllerFromClient( + nodeController, _ := newNodeLifecycleControllerFromClient( nil, item.fakeNodeHandler, evictionTimeout, @@ -608,17 +618,13 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, - nil, - nil, - 0, - false, false) nodeController.now = func() metav1.Time { return fakeNow } nodeController.recorder = testutil.NewFakeRecorder() for _, ds := range item.daemonSets { nodeController.daemonSetInformer.Informer().GetStore().Add(&ds) } - if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -633,7 +639,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { item.fakeNodeHandler.Existing[0].Labels = labels item.fakeNodeHandler.Existing[1].Labels = labels } - if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -644,7 +650,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { if _, ok := nodeController.zonePodEvictor[zone]; ok { nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { nodeUID, _ := value.UID.(string) - util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister()) + nodeutil.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister()) return true, 0 }) } else { @@ -763,12 +769,21 @@ func TestPodStatusChange(t *testing.T) { } for _, item := range table { - nodeController, _ := newNodeControllerFromClient(nil, item.fakeNodeHandler, - evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, - testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + item.fakeNodeHandler, + evictionTimeout, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + false) nodeController.now = func() metav1.Time { return fakeNow } nodeController.recorder = testutil.NewFakeRecorder() - if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -779,7 +794,7 @@ func TestPodStatusChange(t *testing.T) { item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus item.fakeNodeHandler.Existing[1].Status = item.secondNodeNewStatus } - if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -789,7 +804,7 @@ func TestPodStatusChange(t *testing.T) { for _, zone := range zones { nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { nodeUID, _ := value.UID.(string) - util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetStore) + nodeutil.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetStore) return true, 0 }) } @@ -809,7 +824,6 @@ func TestPodStatusChange(t *testing.T) { t.Errorf("expected pod update: %+v, got %+v for %+v", podReasonUpdate, item.expectedPodUpdate, item.description) } } - } func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { @@ -1280,9 +1294,18 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { Existing: item.nodeList, Clientset: fake.NewSimpleClientset(&v1.PodList{Items: item.podList}), } - nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, - evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, - testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + fakeNodeHandler, + evictionTimeout, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + false) nodeController.now = func() metav1.Time { return fakeNow } nodeController.enterPartialDisruptionFunc = func(nodeNum int) float32 { return testRateLimiterQPS @@ -1291,7 +1314,7 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { nodeController.enterFullDisruptionFunc = func(nodeNum int) float32 { return testRateLimiterQPS } - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1309,7 +1332,7 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { fakeNodeHandler.Existing[i].Status = item.updatedNodeStatuses[i] } - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1337,27 +1360,6 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { } } -// doEviction does the fake eviction and returns the status of eviction operation. -func (nc *nodeController) doEviction(fakeNodeHandler *testutil.FakeNodeHandler) bool { - var podEvicted bool - zones := testutil.GetZones(fakeNodeHandler) - for _, zone := range zones { - nc.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { - uid, _ := value.UID.(string) - util.DeletePods(fakeNodeHandler, nc.recorder, value.Value, uid, nc.daemonSetStore) - return true, 0 - }) - } - - for _, action := range fakeNodeHandler.Actions() { - if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { - podEvicted = true - return podEvicted - } - } - return podEvicted -} - // TestCloudProviderNoRateLimit tests that monitorNodes() immediately deletes // pods and the node when kubelet has not reported, and the cloudprovider says // the node is gone. @@ -1384,10 +1386,18 @@ func TestCloudProviderNoRateLimit(t *testing.T) { Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0"), *testutil.NewPod("pod1", "node0")}}), DeleteWaitChan: make(chan struct{}), } - nodeController, _ := newNodeControllerFromClient(nil, fnh, 10*time.Minute, - testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, - testNodeMonitorGracePeriod, testNodeStartupGracePeriod, - testNodeMonitorPeriod, nil, nil, 0, false, false) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + fnh, + 10*time.Minute, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + false) nodeController.cloud = &fakecloud.FakeCloud{} nodeController.now = func() metav1.Time { return metav1.Date(2016, 1, 1, 12, 0, 0, 0, time.UTC) } nodeController.recorder = testutil.NewFakeRecorder() @@ -1395,7 +1405,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) { return false, nil } // monitorNodeStatus should allow this node to be immediately deleted - if err := syncNodeStore(nodeController, fnh); err != nil { + if err := nodeController.syncNodeStore(fnh); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1624,12 +1634,21 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for i, item := range table { - nodeController, _ := newNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute, - testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, - testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + item.fakeNodeHandler, + 5*time.Minute, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + false) nodeController.now = func() metav1.Time { return fakeNow } nodeController.recorder = testutil.NewFakeRecorder() - if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1638,7 +1657,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { if item.timeToPass > 0 { nodeController.now = func() metav1.Time { return metav1.Time{Time: fakeNow.Add(item.timeToPass)} } item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus - if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1768,12 +1787,21 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) { } for i, item := range table { - nodeController, _ := newNodeControllerFromClient(nil, item.fakeNodeHandler, 5*time.Minute, - testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, - testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, false) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + item.fakeNodeHandler, + 5*time.Minute, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + false) nodeController.now = func() metav1.Time { return fakeNow } nodeController.recorder = testutil.NewFakeRecorder() - if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1782,7 +1810,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) { if item.timeToPass > 0 { nodeController.now = func() metav1.Time { return metav1.Time{Time: fakeNow.Add(item.timeToPass)} } item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus - if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1879,12 +1907,21 @@ func TestSwapUnreachableNotReadyTaints(t *testing.T) { originalTaint := UnreachableTaintTemplate updatedTaint := NotReadyTaintTemplate - nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, - evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, - testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, true) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + fakeNodeHandler, + evictionTimeout, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + true) nodeController.now = func() metav1.Time { return fakeNow } nodeController.recorder = testutil.NewFakeRecorder() - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1922,7 +1959,7 @@ func TestSwapUnreachableNotReadyTaints(t *testing.T) { return } - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -1972,9 +2009,18 @@ func TestTaintsNodeByCondition(t *testing.T) { Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), } - nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, evictionTimeout, - testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, - testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, true) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + fakeNodeHandler, + evictionTimeout, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + true) nodeController.now = func() metav1.Time { return fakeNow } nodeController.recorder = testutil.NewFakeRecorder() @@ -2098,11 +2144,11 @@ func TestTaintsNodeByCondition(t *testing.T) { for _, test := range tests { fakeNodeHandler.Update(test.Node) - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } nodeController.doNoScheduleTaintingPass(test.Node) - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } node0, err := nodeController.nodeLister.Get("node0") @@ -2150,10 +2196,18 @@ func TestNodeEventGeneration(t *testing.T) { Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), } - nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute, - testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, - testNodeMonitorGracePeriod, testNodeStartupGracePeriod, - testNodeMonitorPeriod, nil, nil, 0, false, false) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + fakeNodeHandler, + 5*time.Minute, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + false) nodeController.cloud = &fakecloud.FakeCloud{} nodeController.nodeExistsInCloudProvider = func(nodeName types.NodeName) (bool, error) { return false, nil @@ -2161,7 +2215,7 @@ func TestNodeEventGeneration(t *testing.T) { nodeController.now = func() metav1.Time { return fakeNow } fakeRecorder := testutil.NewFakeRecorder() nodeController.recorder = fakeRecorder - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } if err := nodeController.monitorNodeStatus(); err != nil { @@ -2208,9 +2262,18 @@ func TestFixDeprecatedTaintKey(t *testing.T) { Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), } - nodeController, _ := newNodeControllerFromClient(nil, fakeNodeHandler, evictionTimeout, - testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealthyThreshold, testNodeMonitorGracePeriod, - testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false, true) + nodeController, _ := newNodeLifecycleControllerFromClient( + nil, + fakeNodeHandler, + evictionTimeout, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + true) nodeController.now = func() metav1.Time { return fakeNow } nodeController.recorder = testutil.NewFakeRecorder() @@ -2319,11 +2382,11 @@ func TestFixDeprecatedTaintKey(t *testing.T) { for _, test := range tests { fakeNodeHandler.Update(test.Node) - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } nodeController.doFixDeprecatedTaintKeyPass(test.Node) - if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil { + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } node, err := nodeController.nodeLister.Get(test.Node.GetName()) diff --git a/pkg/controller/node/scheduler/BUILD b/pkg/controller/nodelifecycle/scheduler/BUILD similarity index 85% rename from pkg/controller/node/scheduler/BUILD rename to pkg/controller/nodelifecycle/scheduler/BUILD index efe8ad0b8c..c9d54bd628 100644 --- a/pkg/controller/node/scheduler/BUILD +++ b/pkg/controller/nodelifecycle/scheduler/BUILD @@ -1,39 +1,14 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", -) - -go_test( - name = "go_default_test", - srcs = [ - "rate_limited_queue_test.go", - "taint_controller_test.go", - "timed_workers_test.go", - ], - embed = [":go_default_library"], - importpath = "k8s.io/kubernetes/pkg/controller/node/scheduler", - deps = [ - "//pkg/controller/testutil:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", - "//vendor/k8s.io/client-go/testing:go_default_library", - "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", - ], -) +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ "rate_limited_queue.go", - "taint_controller.go", + "taint_manager.go", "timed_workers.go", ], - importpath = "k8s.io/kubernetes/pkg/controller/node/scheduler", + importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler", + visibility = ["//visibility:public"], deps = [ "//pkg/apis/core/helper:go_default_library", "//pkg/apis/core/v1/helper:go_default_library", @@ -53,6 +28,26 @@ go_library( ], ) +go_test( + name = "go_default_test", + srcs = [ + "rate_limited_queue_test.go", + "taint_manager_test.go", + "timed_workers_test.go", + ], + embed = [":go_default_library"], + importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler", + deps = [ + "//pkg/controller/testutil:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", + ], +) + filegroup( name = "package-srcs", srcs = glob(["**"]), @@ -64,4 +59,5 @@ filegroup( name = "all-srcs", srcs = [":package-srcs"], tags = ["automanaged"], + visibility = ["//visibility:public"], ) diff --git a/pkg/controller/node/scheduler/rate_limited_queue.go b/pkg/controller/nodelifecycle/scheduler/rate_limited_queue.go similarity index 100% rename from pkg/controller/node/scheduler/rate_limited_queue.go rename to pkg/controller/nodelifecycle/scheduler/rate_limited_queue.go diff --git a/pkg/controller/node/scheduler/rate_limited_queue_test.go b/pkg/controller/nodelifecycle/scheduler/rate_limited_queue_test.go similarity index 100% rename from pkg/controller/node/scheduler/rate_limited_queue_test.go rename to pkg/controller/nodelifecycle/scheduler/rate_limited_queue_test.go diff --git a/pkg/controller/node/scheduler/taint_controller.go b/pkg/controller/nodelifecycle/scheduler/taint_manager.go similarity index 99% rename from pkg/controller/node/scheduler/taint_controller.go rename to pkg/controller/nodelifecycle/scheduler/taint_manager.go index bf641f69fb..a71fa8fc78 100644 --- a/pkg/controller/node/scheduler/taint_controller.go +++ b/pkg/controller/nodelifecycle/scheduler/taint_manager.go @@ -18,9 +18,6 @@ package scheduler import ( "fmt" - "sync" - "time" - "k8s.io/api/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/apis/core/helper" @@ -30,6 +27,8 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "sync" + "time" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" diff --git a/pkg/controller/node/scheduler/taint_controller_test.go b/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go similarity index 100% rename from pkg/controller/node/scheduler/taint_controller_test.go rename to pkg/controller/nodelifecycle/scheduler/taint_manager_test.go diff --git a/pkg/controller/node/scheduler/timed_workers.go b/pkg/controller/nodelifecycle/scheduler/timed_workers.go similarity index 100% rename from pkg/controller/node/scheduler/timed_workers.go rename to pkg/controller/nodelifecycle/scheduler/timed_workers.go diff --git a/pkg/controller/node/scheduler/timed_workers_test.go b/pkg/controller/nodelifecycle/scheduler/timed_workers_test.go similarity index 100% rename from pkg/controller/node/scheduler/timed_workers_test.go rename to pkg/controller/nodelifecycle/scheduler/timed_workers_test.go diff --git a/pkg/controller/node/util/BUILD b/pkg/controller/util/node/BUILD similarity index 87% rename from pkg/controller/node/util/BUILD rename to pkg/controller/util/node/BUILD index b14d0cb926..8a46251040 100644 --- a/pkg/controller/node/util/BUILD +++ b/pkg/controller/util/node/BUILD @@ -1,14 +1,10 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", -) +load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", srcs = ["controller_utils.go"], - importpath = "k8s.io/kubernetes/pkg/controller/node/util", + importpath = "k8s.io/kubernetes/pkg/controller/util/node", + visibility = ["//visibility:public"], deps = [ "//pkg/apis/core:go_default_library", "//pkg/cloudprovider:go_default_library", @@ -41,4 +37,5 @@ filegroup( name = "all-srcs", srcs = [":package-srcs"], tags = ["automanaged"], + visibility = ["//visibility:public"], ) diff --git a/pkg/controller/node/util/controller_utils.go b/pkg/controller/util/node/controller_utils.go similarity index 96% rename from pkg/controller/node/util/controller_utils.go rename to pkg/controller/util/node/controller_utils.go index 643defce96..4c9a9279ac 100644 --- a/pkg/controller/node/util/controller_utils.go +++ b/pkg/controller/util/node/controller_utils.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package node import ( "errors" @@ -170,9 +170,9 @@ func MarkAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error { return fmt.Errorf("%v", strings.Join(errMsg, "; ")) } -// NodeExistsInCloudProvider returns true if the node exists in the +// ExistsInCloudProvider returns true if the node exists in the // cloud provider. -func NodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeName) (bool, error) { +func ExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeName) (bool, error) { instances, ok := cloud.Instances() if !ok { return false, fmt.Errorf("%v", ErrCloudInstance) @@ -198,7 +198,7 @@ func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event) } -// RecordNodeStatusChange records a event related to a node status change. +// RecordNodeStatusChange records a event related to a node status change. (Common to lifecycle and ipam) func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) { ref := &v1.ObjectReference{ Kind: "Node", @@ -257,7 +257,7 @@ func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { } } -// CreateUpdateNodeHandler creates a node update handler. +// CreateUpdateNodeHandler creates a node update handler. (Common to lifecycle and ipam) func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) { return func(origOldObj, origNewObj interface{}) { node := origNewObj.(*v1.Node).DeepCopy() @@ -269,7 +269,7 @@ func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldOb } } -// CreateDeleteNodeHandler creates a delete node handler. +// CreateDeleteNodeHandler creates a delete node handler. (Common to lifecycle and ipam) func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { return func(originalObj interface{}) { originalNode, isNode := originalObj.(*v1.Node) diff --git a/test/e2e/apps/BUILD b/test/e2e/apps/BUILD index b6bb7d4fbe..d7194290d6 100644 --- a/test/e2e/apps/BUILD +++ b/test/e2e/apps/BUILD @@ -33,7 +33,7 @@ go_library( "//pkg/controller/daemon:go_default_library", "//pkg/controller/deployment/util:go_default_library", "//pkg/controller/job:go_default_library", - "//pkg/controller/node:go_default_library", + "//pkg/controller/nodelifecycle:go_default_library", "//pkg/controller/replicaset:go_default_library", "//pkg/controller/replication:go_default_library", "//pkg/kubectl:go_default_library", diff --git a/test/e2e/apps/network_partition.go b/test/e2e/apps/network_partition.go index 4cabc2b46b..b9a22afc48 100644 --- a/test/e2e/apps/network_partition.go +++ b/test/e2e/apps/network_partition.go @@ -33,7 +33,7 @@ import ( "k8s.io/api/core/v1" clientset "k8s.io/client-go/kubernetes" api "k8s.io/kubernetes/pkg/apis/core" - nodepkg "k8s.io/kubernetes/pkg/controller/node" + nodepkg "k8s.io/kubernetes/pkg/controller/nodelifecycle" "k8s.io/kubernetes/test/e2e/common" "k8s.io/kubernetes/test/e2e/framework" testutils "k8s.io/kubernetes/test/utils" diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index 66bab803c1..9f7e222fed 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -57,7 +57,7 @@ go_library( "//pkg/cloudprovider/providers/gce:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/deployment/util:go_default_library", - "//pkg/controller/node:go_default_library", + "//pkg/controller/nodelifecycle:go_default_library", "//pkg/features:go_default_library", "//pkg/kubectl:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index bdf52dfdd9..58c01bf61d 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -86,7 +86,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider/providers/azure" gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/controller" - nodectlr "k8s.io/kubernetes/pkg/controller/node" + nodectlr "k8s.io/kubernetes/pkg/controller/nodelifecycle" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubelet/util/format" diff --git a/test/integration/garbagecollector/BUILD b/test/integration/garbagecollector/BUILD index 8bd1651d13..d6a537df07 100644 --- a/test/integration/garbagecollector/BUILD +++ b/test/integration/garbagecollector/BUILD @@ -1,9 +1,4 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_test", -) +load("@io_bazel_rules_go//go:def.bzl", "go_test") go_test( name = "go_default_test", @@ -52,4 +47,5 @@ filegroup( name = "all-srcs", srcs = [":package-srcs"], tags = ["automanaged"], + visibility = ["//visibility:public"], ) diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index 0fbdc42db6..ec4b7bb01a 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -30,8 +30,7 @@ go_test( "//pkg/apis/core/v1/helper:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/informers/informers_generated/internalversion:go_default_library", - "//pkg/controller/node:go_default_library", - "//pkg/controller/node/ipam:go_default_library", + "//pkg/controller/nodelifecycle:go_default_library", "//pkg/controller/volume/persistentvolume:go_default_library", "//pkg/features:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", diff --git a/test/integration/scheduler/taint_test.go b/test/integration/scheduler/taint_test.go index da8fc51aa0..bb227ba4dd 100644 --- a/test/integration/scheduler/taint_test.go +++ b/test/integration/scheduler/taint_test.go @@ -37,8 +37,7 @@ import ( "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" internalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" - "k8s.io/kubernetes/pkg/controller/node" - "k8s.io/kubernetes/pkg/controller/node/ipam" + "k8s.io/kubernetes/pkg/controller/nodelifecycle" kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" "k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction" pluginapi "k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction/apis/podtolerationrestriction" @@ -85,29 +84,24 @@ func TestTaintNodeByCondition(t *testing.T) { controllerCh := make(chan struct{}) defer close(controllerCh) - // Start NodeController for taint. - nc, err := node.NewNodeController( + // Start NodeLifecycleController for taint. + nc, err := nodelifecycle.NewNodeLifecycleController( informers.Core().V1().Pods(), informers.Core().V1().Nodes(), informers.Extensions().V1beta1().DaemonSets(), nil, // CloudProvider clientset, + time.Second, // Node monitor grace period + time.Second, // Node startup grace period + time.Second, // Node monitor period time.Second, // Pod eviction timeout 100, // Eviction limiter QPS 100, // Secondary eviction limiter QPS 100, // Large cluster threshold 100, // Unhealthy zone threshold - time.Second, // Node monitor grace period - time.Second, // Node startup grace period - time.Second, // Node monitor period - nil, // Cluster CIDR - nil, // Service CIDR - 0, // Node CIDR mask size - false, // Allocate node CIDRs - ipam.RangeAllocatorType, // Allocator type - true, // Run taint manger - true, // Enabled taint based eviction - true, // Enabled TaintNodeByCondition feature + true, // Run taint manager + true, // Use taint based evictions + true, // Enabled TaintNodeByCondition feature ) if err != nil { t.Errorf("Failed to create node controller: %v", err)