Break up node controller into packages

This change does NO actual code changes other than moving constituent
parts into packages.
pull/6/head
Bowei Du 2017-08-08 12:55:57 -07:00
parent c9d142d73d
commit 27854fa0d8
23 changed files with 469 additions and 236 deletions

View File

@ -63,6 +63,7 @@ go_library(
"//pkg/controller/job:go_default_library",
"//pkg/controller/namespace:go_default_library",
"//pkg/controller/node:go_default_library",
"//pkg/controller/node/ipam:go_default_library",
"//pkg/controller/podautoscaler:go_default_library",
"//pkg/controller/podautoscaler/metrics:go_default_library",
"//pkg/controller/podgc:go_default_library",

View File

@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/controller/garbagecollector"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
"k8s.io/kubernetes/pkg/controller/node/ipam"
"k8s.io/kubernetes/pkg/controller/podgc"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
@ -108,7 +109,7 @@ func startNodeController(ctx ControllerContext) (bool, error) {
serviceCIDR,
int(ctx.Options.NodeCIDRMaskSize),
ctx.Options.AllocateNodeCIDRs,
nodecontroller.CIDRAllocatorType(ctx.Options.CIDRAllocatorType),
ipam.CIDRAllocatorType(ctx.Options.CIDRAllocatorType),
ctx.Options.EnableTaintManager,
utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
)

View File

@ -10,25 +10,20 @@ load(
go_test(
name = "go_default_test",
srcs = [
"cidr_allocator_test.go",
"cidr_set_test.go",
"nodecontroller_test.go",
"rate_limited_queue_test.go",
"taint_controller_test.go",
"timed_workers_test.go",
],
srcs = ["nodecontroller_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/fake:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/node/ipam:go_default_library",
"//pkg/controller/node/scheduler:go_default_library",
"//pkg/controller/node/util:go_default_library",
"//pkg/controller/testutil:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/taints:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
@ -36,7 +31,6 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
@ -44,35 +38,24 @@ go_test(
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"cidr_allocator.go",
"cidr_set.go",
"cloud_cidr_allocator.go",
"controller_utils.go",
"doc.go",
"metrics.go",
"node_controller.go",
"range_allocator.go",
"rate_limited_queue.go",
"taint_controller.go",
"timed_workers.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/helper:go_default_library",
"//pkg/api/v1/helper:go_default_library",
"//pkg/api/v1/node:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/controller/node/ipam:go_default_library",
"//pkg/controller/node/scheduler:go_default_library",
"//pkg/controller/node/util:go_default_library",
"//pkg/util/metrics:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/system:go_default_library",
@ -88,9 +71,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/informers/extensions/v1beta1:go_default_library",
@ -102,7 +83,6 @@ go_library(
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)
@ -115,6 +95,11 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//pkg/controller/node/ipam:all-srcs",
"//pkg/controller/node/scheduler:all-srcs",
"//pkg/controller/node/util:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -0,0 +1,68 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["cidr_allocator_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/controller/testutil:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"cidr_allocator.go",
"cloud_cidr_allocator.go",
"range_allocator.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/controller/node/ipam/cidrset:go_default_library",
"//pkg/controller/node/util:go_default_library",
"//pkg/util/node:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/controller/node/ipam/cidrset:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -0,0 +1,7 @@
approvers:
- bowei
- dnardo
reviewers:
- bowei
- dnardo
- freehan

View File

@ -14,18 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package ipam
import (
"errors"
"net"
v1 "k8s.io/api/core/v1"
)
var errCIDRRangeNoCIDRsRemaining = errors.New(
"CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range")
type nodeAndCIDR struct {
cidr *net.IPNet
nodeName string

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package ipam
import (
"net"
@ -143,7 +143,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
return
}
rangeAllocator.recorder = testutil.NewFakeRecorder()
if err = rangeAllocator.cidrs.occupy(cidr); err != nil {
if err = rangeAllocator.cidrs.Occupy(cidr); err != nil {
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
}
}
@ -225,7 +225,7 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) {
return
}
rangeAllocator.recorder = testutil.NewFakeRecorder()
err = rangeAllocator.cidrs.occupy(cidr)
err = rangeAllocator.cidrs.Occupy(cidr)
if err != nil {
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
}
@ -337,7 +337,7 @@ func TestReleaseCIDRSuccess(t *testing.T) {
return
}
rangeAllocator.recorder = testutil.NewFakeRecorder()
err = rangeAllocator.cidrs.occupy(cidr)
err = rangeAllocator.cidrs.Occupy(cidr)
if err != nil {
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
}

View File

@ -0,0 +1,36 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["cidr_set_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = ["//vendor/github.com/golang/glog:go_default_library"],
)
go_library(
name = "go_default_library",
srcs = ["cidr_set.go"],
tags = ["automanaged"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -14,17 +14,18 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package cidrset
import (
"encoding/binary"
"errors"
"fmt"
"math/big"
"net"
"sync"
)
type cidrSet struct {
type CidrSet struct {
sync.Mutex
clusterCIDR *net.IPNet
clusterIP net.IP
@ -44,7 +45,12 @@ const (
maxPrefixLength = 64
)
func newCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) *cidrSet {
var (
ErrCIDRRangeNoCIDRsRemaining = errors.New(
"CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range")
)
func NewCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) *CidrSet {
clusterMask := clusterCIDR.Mask
clusterMaskSize, _ := clusterMask.Size()
@ -54,7 +60,7 @@ func newCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) *cidrSet {
} else {
maxCIDRs = 1 << uint32(subNetMaskSize-clusterMaskSize)
}
return &cidrSet{
return &CidrSet{
clusterCIDR: clusterCIDR,
clusterIP: clusterCIDR.IP,
clusterMaskSize: clusterMaskSize,
@ -63,7 +69,7 @@ func newCIDRSet(clusterCIDR *net.IPNet, subNetMaskSize int) *cidrSet {
}
}
func (s *cidrSet) indexToCIDRBlock(index int) *net.IPNet {
func (s *CidrSet) indexToCIDRBlock(index int) *net.IPNet {
var ip []byte
var mask int
switch /*v4 or v6*/ {
@ -91,7 +97,7 @@ func (s *cidrSet) indexToCIDRBlock(index int) *net.IPNet {
}
}
func (s *cidrSet) allocateNext() (*net.IPNet, error) {
func (s *CidrSet) AllocateNext() (*net.IPNet, error) {
s.Lock()
defer s.Unlock()
@ -104,7 +110,7 @@ func (s *cidrSet) allocateNext() (*net.IPNet, error) {
}
}
if nextUnused == -1 {
return nil, errCIDRRangeNoCIDRsRemaining
return nil, ErrCIDRRangeNoCIDRsRemaining
}
s.nextCandidate = (nextUnused + 1) % s.maxCIDRs
@ -113,7 +119,7 @@ func (s *cidrSet) allocateNext() (*net.IPNet, error) {
return s.indexToCIDRBlock(nextUnused), nil
}
func (s *cidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err error) {
func (s *CidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err error) {
begin, end = 0, s.maxCIDRs-1
cidrMask := cidr.Mask
maskSize, _ := cidrMask.Size()
@ -160,7 +166,7 @@ func (s *cidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err
return begin, end, nil
}
func (s *cidrSet) release(cidr *net.IPNet) error {
func (s *CidrSet) Release(cidr *net.IPNet) error {
begin, end, err := s.getBeginingAndEndIndices(cidr)
if err != nil {
return err
@ -173,7 +179,7 @@ func (s *cidrSet) release(cidr *net.IPNet) error {
return nil
}
func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
func (s *CidrSet) Occupy(cidr *net.IPNet) (err error) {
begin, end, err := s.getBeginingAndEndIndices(cidr)
if err != nil {
return err
@ -188,7 +194,7 @@ func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
return nil
}
func (s *cidrSet) getIndexForCIDR(cidr *net.IPNet) (int, error) {
func (s *CidrSet) getIndexForCIDR(cidr *net.IPNet) (int, error) {
var cidrIndex uint32
if cidr.IP.To4() != nil {
cidrIndex = (binary.BigEndian.Uint32(s.clusterIP) ^ binary.BigEndian.Uint32(cidr.IP.To4())) >> uint32(32-s.subNetMaskSize)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package cidrset
import (
"math/big"
@ -47,9 +47,9 @@ func TestCIDRSetFullyAllocated(t *testing.T) {
}
for _, tc := range cases {
_, clusterCIDR, _ := net.ParseCIDR(tc.clusterCIDRStr)
a := newCIDRSet(clusterCIDR, tc.subNetMaskSize)
a := NewCIDRSet(clusterCIDR, tc.subNetMaskSize)
p, err := a.allocateNext()
p, err := a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
@ -58,14 +58,14 @@ func TestCIDRSetFullyAllocated(t *testing.T) {
p.String(), tc.expectedCIDR, tc.description)
}
_, err = a.allocateNext()
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
a.release(p)
a.Release(p)
p, err = a.allocateNext()
p, err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
@ -73,7 +73,7 @@ func TestCIDRSetFullyAllocated(t *testing.T) {
t.Fatalf("unexpected allocated cidr: %v, expecting %v for %v",
p.String(), tc.expectedCIDR, tc.description)
}
_, err = a.allocateNext()
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
@ -133,7 +133,7 @@ func TestIndexToCIDRBlock(t *testing.T) {
}
for _, tc := range cases {
_, clusterCIDR, _ := net.ParseCIDR(tc.clusterCIDRStr)
a := newCIDRSet(clusterCIDR, tc.subnetMaskSize)
a := NewCIDRSet(clusterCIDR, tc.subnetMaskSize)
cidr := a.indexToCIDRBlock(tc.index)
if cidr.String() != tc.CIDRBlock {
t.Fatalf("error for %v index %d %s", tc.description, tc.index, cidr.String())
@ -157,12 +157,12 @@ func TestCIDRSet_RandomishAllocation(t *testing.T) {
}
for _, tc := range cases {
_, clusterCIDR, _ := net.ParseCIDR(tc.clusterCIDRStr)
a := newCIDRSet(clusterCIDR, 24)
a := NewCIDRSet(clusterCIDR, 24)
// allocate all the CIDRs
var cidrs []*net.IPNet
for i := 0; i < 256; i++ {
if c, err := a.allocateNext(); err == nil {
if c, err := a.AllocateNext(); err == nil {
cidrs = append(cidrs, c)
} else {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
@ -170,25 +170,25 @@ func TestCIDRSet_RandomishAllocation(t *testing.T) {
}
var err error
_, err = a.allocateNext()
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
// release them all
for i := 0; i < len(cidrs); i++ {
a.release(cidrs[i])
a.Release(cidrs[i])
}
// allocate the CIDRs again
var rcidrs []*net.IPNet
for i := 0; i < 256; i++ {
if c, err := a.allocateNext(); err == nil {
if c, err := a.AllocateNext(); err == nil {
rcidrs = append(rcidrs, c)
} else {
t.Fatalf("unexpected error: %d, %v for %v", i, err, tc.description)
}
}
_, err = a.allocateNext()
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
@ -215,14 +215,14 @@ func TestCIDRSet_AllocationOccupied(t *testing.T) {
}
for _, tc := range cases {
_, clusterCIDR, _ := net.ParseCIDR(tc.clusterCIDRStr)
a := newCIDRSet(clusterCIDR, 24)
a := NewCIDRSet(clusterCIDR, 24)
// allocate all the CIDRs
var cidrs []*net.IPNet
var num_cidrs = 256
for i := 0; i < num_cidrs; i++ {
if c, err := a.allocateNext(); err == nil {
if c, err := a.AllocateNext(); err == nil {
cidrs = append(cidrs, c)
} else {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
@ -230,29 +230,29 @@ func TestCIDRSet_AllocationOccupied(t *testing.T) {
}
var err error
_, err = a.allocateNext()
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
// release them all
for i := 0; i < len(cidrs); i++ {
a.release(cidrs[i])
a.Release(cidrs[i])
}
// occupy the last 128 CIDRs
for i := num_cidrs / 2; i < num_cidrs; i++ {
a.occupy(cidrs[i])
a.Occupy(cidrs[i])
}
// allocate the first 128 CIDRs again
var rcidrs []*net.IPNet
for i := 0; i < num_cidrs/2; i++ {
if c, err := a.allocateNext(); err == nil {
if c, err := a.AllocateNext(); err == nil {
rcidrs = append(rcidrs, c)
} else {
t.Fatalf("unexpected error: %d, %v for %v", i, err, tc.description)
}
}
_, err = a.allocateNext()
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range for %v", tc.description)
}
@ -394,7 +394,7 @@ func TestGetBitforCIDR(t *testing.T) {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
cs := newCIDRSet(clusterCIDR, tc.subNetMaskSize)
cs := NewCIDRSet(clusterCIDR, tc.subNetMaskSize)
_, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr)
if err != nil {
@ -562,14 +562,14 @@ func TestOccupy(t *testing.T) {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
cs := newCIDRSet(clusterCIDR, tc.subNetMaskSize)
cs := NewCIDRSet(clusterCIDR, tc.subNetMaskSize)
_, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr)
if err != nil {
t.Fatalf("unexpected error: %v for %v", err, tc.description)
}
err = cs.occupy(subnetCIDR)
err = cs.Occupy(subnetCIDR)
if err == nil && tc.expectErr {
t.Errorf("expected error but got none for %v", tc.description)
continue
@ -629,9 +629,9 @@ func TestCIDRSetv6(t *testing.T) {
}
for _, tc := range cases {
_, clusterCIDR, _ := net.ParseCIDR(tc.clusterCIDRStr)
a := newCIDRSet(clusterCIDR, tc.subNetMaskSize)
a := NewCIDRSet(clusterCIDR, tc.subNetMaskSize)
p, err := a.allocateNext()
p, err := a.AllocateNext()
if err == nil && tc.expectErr {
t.Errorf("expected error but got none for %v", tc.description)
continue
@ -645,7 +645,7 @@ func TestCIDRSetv6(t *testing.T) {
t.Fatalf("unexpected allocated cidr: %s for %v", p.String(), tc.description)
}
}
p2, err := a.allocateNext()
p2, err := a.AllocateNext()
if !tc.expectErr {
if p2.String() != tc.expectedCIDR2 {
t.Fatalf("unexpected allocated cidr: %s for %v", p2.String(), tc.description)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package ipam
import (
"fmt"
@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/controller/node/util"
nodeutil "k8s.io/kubernetes/pkg/util/node"
)
@ -79,12 +80,12 @@ func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
cidrs, err := ca.cloud.AliasRanges(types.NodeName(node.Name))
if err != nil {
recordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: %v", err)
}
if len(cidrs) == 0 {
recordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
glog.V(2).Infof("Node %v has no CIDRs", node.Name)
return fmt.Errorf("failed to allocate cidr (none exist)")
}

View File

@ -14,13 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package ipam
import (
"fmt"
"net"
"sync"
"github.com/golang/glog"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -31,7 +33,8 @@ import (
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/controller/node/ipam/cidrset"
"k8s.io/kubernetes/pkg/controller/node/util"
)
// TODO: figure out the good setting for those constants.
@ -45,7 +48,7 @@ const (
type rangeAllocator struct {
client clientset.Interface
cidrs *cidrSet
cidrs *cidrset.CidrSet
clusterCIDR *net.IPNet
maxCIDRs int
// Channel that is used to pass updating Nodes with assigned CIDRs to the background
@ -74,7 +77,7 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
ra := &rangeAllocator{
client: client,
cidrs: newCIDRSet(clusterCIDR, subNetMaskSize),
cidrs: cidrset.NewCIDRSet(clusterCIDR, subNetMaskSize),
clusterCIDR: clusterCIDR,
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
recorder: recorder,
@ -150,7 +153,7 @@ func (r *rangeAllocator) occupyCIDR(node *v1.Node) error {
if err != nil {
return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
}
if err := r.cidrs.occupy(podCIDR); err != nil {
if err := r.cidrs.Occupy(podCIDR); err != nil {
return fmt.Errorf("failed to mark cidr as occupied: %v", err)
}
return nil
@ -169,10 +172,10 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
if node.Spec.PodCIDR != "" {
return r.occupyCIDR(node)
}
podCIDR, err := r.cidrs.allocateNext()
podCIDR, err := r.cidrs.AllocateNext()
if err != nil {
r.removeNodeFromProcessing(node.Name)
recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
util.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: %v", err)
}
@ -194,7 +197,7 @@ func (r *rangeAllocator) ReleaseCIDR(node *v1.Node) error {
}
glog.V(4).Infof("release CIDR %s", node.Spec.PodCIDR)
if err = r.cidrs.release(podCIDR); err != nil {
if err = r.cidrs.Release(podCIDR); err != nil {
return fmt.Errorf("Error when releasing CIDR %v: %v", node.Spec.PodCIDR, err)
}
return err
@ -212,7 +215,7 @@ func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
return
}
if err := r.cidrs.occupy(serviceCIDR); err != nil {
if err := r.cidrs.Occupy(serviceCIDR); err != nil {
glog.Errorf("Error filtering out service cidr %v: %v", serviceCIDR, err)
}
}
@ -232,7 +235,7 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
if node.Spec.PodCIDR != "" {
glog.Errorf("Node %v already has allocated CIDR %v. Releasing assigned one if different.", node.Name, node.Spec.PodCIDR)
if node.Spec.PodCIDR != data.cidr.String() {
if err := r.cidrs.release(data.cidr); err != nil {
if err := r.cidrs.Release(data.cidr); err != nil {
glog.Errorf("Error when releasing CIDR %v", data.cidr.String())
}
}
@ -246,13 +249,13 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
}
}
if err != nil {
recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
util.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
// We accept the fact that we may leek CIDRs here. This is safer than releasing
// them in case when we don't know if request went through.
// NodeController restart will return all falsely allocated CIDRs to the pool.
if !apierrors.IsServerTimeout(err) {
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil {
if releaseErr := r.cidrs.Release(data.cidr); releaseErr != nil {
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr)
}
}

View File

@ -17,12 +17,13 @@ limitations under the License.
package node
import (
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/golang/glog"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -47,14 +48,15 @@ import (
v1node "k8s.io/kubernetes/pkg/api/v1/node"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/node/ipam"
"k8s.io/kubernetes/pkg/controller/node/scheduler"
"k8s.io/kubernetes/pkg/controller/node/util"
"k8s.io/kubernetes/pkg/util/metrics"
utilnode "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/system"
taintutils "k8s.io/kubernetes/pkg/util/taints"
utilversion "k8s.io/kubernetes/pkg/util/version"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"github.com/golang/glog"
)
func init() {
@ -63,13 +65,8 @@ func init() {
}
var (
ErrCloudInstance = errors.New("cloud provider doesn't support instances.")
gracefulDeletionVersion = utilversion.MustParseSemantic("v1.1.0")
// The minimum kubelet version for which the nodecontroller
// can safely flip pod.Status to NotReady.
podStatusReconciliationVersion = utilversion.MustParseSemantic("v1.2.0")
UnreachableTaintTemplate = &v1.Taint{
Key: algorithm.TaintNodeUnreachable,
Effect: v1.TaintEffectNoExecute,
@ -82,12 +79,6 @@ var (
)
const (
// nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update.
nodeStatusUpdateRetry = 5
// controls how often NodeController will try to evict Pods from non-responsive Nodes.
nodeEvictionPeriod = 100 * time.Millisecond
// Burst value for all eviction rate limiters
evictionRateLimiterBurst = 1
// The amount of time the nodecontroller polls on the list nodes endpoint.
apiserverStartupGracePeriod = 10 * time.Minute
// The amount of time the nodecontroller should sleep between retrying NodeStatus updates
@ -111,7 +102,7 @@ type nodeStatusData struct {
type NodeController struct {
allocateNodeCIDRs bool
allocatorType CIDRAllocatorType
allocatorType ipam.CIDRAllocatorType
cloud cloudprovider.Interface
clusterCIDR *net.IPNet
@ -150,9 +141,9 @@ type NodeController struct {
// Lock to access evictor workers
evictorLock sync.Mutex
// workers that evicts pods from unresponsive nodes.
zonePodEvictor map[string]*RateLimitedTimedQueue
zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue
// workers that are responsible for tainting nodes.
zoneNoExecuteTainer map[string]*RateLimitedTimedQueue
zoneNoExecuteTainer map[string]*scheduler.RateLimitedTimedQueue
podEvictionTimeout time.Duration
// The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration
@ -166,9 +157,9 @@ type NodeController struct {
podInformerSynced cache.InformerSynced
cidrAllocator CIDRAllocator
cidrAllocator ipam.CIDRAllocator
taintManager *NoExecuteTaintManager
taintManager *scheduler.NoExecuteTaintManager
forcefullyDeletePod func(*v1.Pod) error
nodeExistsInCloudProvider func(types.NodeName) (bool, error)
@ -213,7 +204,7 @@ func NewNodeController(
serviceCIDR *net.IPNet,
nodeCIDRMaskSize int,
allocateNodeCIDRs bool,
allocatorType CIDRAllocatorType,
allocatorType ipam.CIDRAllocatorType,
runTaintManager bool,
useTaintBasedEvictions bool) (*NodeController, error) {
eventBroadcaster := record.NewBroadcaster()
@ -247,8 +238,8 @@ func NewNodeController(
recorder: recorder,
podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute,
zonePodEvictor: make(map[string]*RateLimitedTimedQueue),
zoneNoExecuteTainer: make(map[string]*RateLimitedTimedQueue),
zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue),
zoneNoExecuteTainer: make(map[string]*scheduler.RateLimitedTimedQueue),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
@ -259,8 +250,8 @@ func NewNodeController(
serviceCIDR: serviceCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
allocatorType: allocatorType,
forcefullyDeletePod: func(p *v1.Pod) error { return forcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
forcefullyDeletePod: func(p *v1.Pod) error { return util.ForcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) { return util.NodeExistsInCloudProvider(cloud, nodeName) },
evictionLimiterQPS: evictionLimiterQPS,
secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS,
largeClusterThreshold: largeClusterThreshold,
@ -334,11 +325,11 @@ func NewNodeController(
}
switch nc.allocatorType {
case RangeAllocatorType:
nc.cidrAllocator, err = NewCIDRRangeAllocator(
case ipam.RangeAllocatorType:
nc.cidrAllocator, err = ipam.NewCIDRRangeAllocator(
kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList)
case CloudAllocatorType:
nc.cidrAllocator, err = NewCloudCIDRAllocator(kubeClient, cloud)
case ipam.CloudAllocatorType:
nc.cidrAllocator, err = ipam.NewCloudCIDRAllocator(kubeClient, cloud)
default:
return nil, fmt.Errorf("Invalid CIDR allocator type: %v", nc.allocatorType)
}
@ -348,8 +339,8 @@ func NewNodeController(
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: createAddNodeHandler(nc.cidrAllocator.AllocateOrOccupyCIDR),
UpdateFunc: createUpdateNodeHandler(func(_, newNode *v1.Node) error {
AddFunc: util.CreateAddNodeHandler(nc.cidrAllocator.AllocateOrOccupyCIDR),
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
// If the PodCIDR is not empty we either:
// - already processed a Node that already had a CIDR after NC restarted
// (cidr is marked as used),
@ -374,26 +365,26 @@ func NewNodeController(
}
return nil
}),
DeleteFunc: createDeleteNodeHandler(nc.cidrAllocator.ReleaseCIDR),
DeleteFunc: util.CreateDeleteNodeHandler(nc.cidrAllocator.ReleaseCIDR),
})
}
if nc.runTaintManager {
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: createAddNodeHandler(func(node *v1.Node) error {
AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(nil, node)
return nil
}),
UpdateFunc: createUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
UpdateFunc: util.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
nc.taintManager.NodeUpdated(oldNode, newNode)
return nil
}),
DeleteFunc: createDeleteNodeHandler(func(node *v1.Node) error {
DeleteFunc: util.CreateDeleteNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(node, nil)
return nil
}),
})
nc.taintManager = NewNoExecuteTaintManager(kubeClient)
nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient)
}
nc.nodeLister = nodeInformer.Lister()
@ -410,7 +401,7 @@ func (nc *NodeController) doEvictionPass() {
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
glog.Warningf("Node %v no longer present in nodeLister!", value.Value)
@ -421,7 +412,7 @@ func (nc *NodeController) doEvictionPass() {
EvictionsNumber.WithLabelValues(zone).Inc()
}
nodeUid, _ := value.UID.(string)
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
remaining, err := util.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
@ -439,7 +430,7 @@ func (nc *NodeController) doNoExecuteTaintingPass() {
defer nc.evictorLock.Unlock()
for k := range nc.zoneNoExecuteTainer {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zoneNoExecuteTainer[k].Try(func(value TimedValue) (bool, time.Duration) {
nc.zoneNoExecuteTainer[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
glog.Warningf("Node %v no longer present in nodeLister!", value.Value)
@ -468,7 +459,7 @@ func (nc *NodeController) doNoExecuteTaintingPass() {
return true, 0
}
return swapNodeControllerTaint(nc.kubeClient, &taintToAdd, &oppositeTaint, node), 0
return util.SwapNodeControllerTaint(nc.kubeClient, &taintToAdd, &oppositeTaint, node), 0
})
}
}
@ -498,12 +489,12 @@ func (nc *NodeController) Run(stopCh <-chan struct{}) {
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doNoExecuteTaintingPass, nodeEvictionPeriod, wait.NeverStop)
go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, wait.NeverStop)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop)
}
<-stopCh
@ -516,12 +507,12 @@ func (nc *NodeController) addPodEvictorForNewZone(node *v1.Node) {
nc.zoneStates[zone] = stateInitial
if !nc.useTaintBasedEvictions {
nc.zonePodEvictor[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
scheduler.NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))
} else {
nc.zoneNoExecuteTainer[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
scheduler.NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))
}
// Init the metric for the new zone.
glog.Infof("Initializing eviction metric for zone: %v", zone)
@ -547,7 +538,7 @@ func (nc *NodeController) monitorNodeStatus() error {
for i := range added {
glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name)
recordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
util.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
nc.knownNodeSet[added[i].Name] = added[i]
nc.addPodEvictorForNewZone(added[i])
if nc.useTaintBasedEvictions {
@ -559,7 +550,7 @@ func (nc *NodeController) monitorNodeStatus() error {
for i := range deleted {
glog.V(1).Infof("NodeController observed a Node deletion: %v", deleted[i].Name)
recordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name))
util.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name))
delete(nc.knownNodeSet, deleted[i].Name)
}
@ -574,7 +565,7 @@ func (nc *NodeController) monitorNodeStatus() error {
continue
}
node := nodeCopy.(*v1.Node)
if err := wait.PollImmediate(retrySleepTime, retrySleepTime*nodeStatusUpdateRetry, func() (bool, error) {
if err := wait.PollImmediate(retrySleepTime, retrySleepTime*scheduler.NodeStatusUpdateRetry, func() (bool, error) {
gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeStatus(node)
if err == nil {
return true, nil
@ -605,7 +596,7 @@ func (nc *NodeController) monitorNodeStatus() error {
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
taintToAdd := *NotReadyTaintTemplate
if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, UnreachableTaintTemplate, node) {
if !util.SwapNodeControllerTaint(nc.kubeClient, &taintToAdd, UnreachableTaintTemplate, node) {
glog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node) {
@ -632,7 +623,7 @@ func (nc *NodeController) monitorNodeStatus() error {
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
taintToAdd := *UnreachableTaintTemplate
if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, NotReadyTaintTemplate, node) {
if !util.SwapNodeControllerTaint(nc.kubeClient, &taintToAdd, NotReadyTaintTemplate, node) {
glog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node) {
@ -672,8 +663,8 @@ func (nc *NodeController) monitorNodeStatus() error {
// Report node event.
if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
recordNodeStatusChange(nc.recorder, node, "NodeNotReady")
if err = markAllPodsNotReady(nc.kubeClient, node); err != nil {
util.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
if err = util.MarkAllPodsNotReady(nc.kubeClient, node); err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
}
}
@ -688,13 +679,13 @@ func (nc *NodeController) monitorNodeStatus() error {
}
if !exists {
glog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
recordNodeEvent(nc.recorder, node.Name, string(node.UID), v1.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
util.RecordNodeEvent(nc.recorder, node.Name, string(node.UID), v1.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
go func(nodeName string) {
defer utilruntime.HandleCrash()
// Kubelet is not reporting and Cloud Provider says node
// is gone. Delete it without worrying about grace
// periods.
if err := forcefullyDeleteNode(nc.kubeClient, nodeName); err != nil {
if err := util.ForcefullyDeleteNode(nc.kubeClient, nodeName); err != nil {
glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err)
}
}(node.Name)
@ -1122,3 +1113,55 @@ func (nc *NodeController) ComputeZoneState(nodeReadyConditions []*v1.NodeConditi
return notReadyNodes, stateNormal
}
}
// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating
// that should not be gracefully terminated.
func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a Pod %#v", obj)
return
}
}
// consider only terminating pods
if pod.DeletionTimestamp == nil {
return
}
node, err := nc.nodeLister.Get(pod.Spec.NodeName)
// if there is no such node, do nothing and let the podGC clean it up.
if apierrors.IsNotFound(err) {
return
}
if err != nil {
// this can only happen if the Store.KeyFunc has a problem creating
// a key for the pod. If it happens once, it will happen again so
// don't bother requeuing the pod.
utilruntime.HandleError(err)
return
}
// delete terminating pods that have been scheduled on
// nodes that do not support graceful termination
// TODO(mikedanese): this can be removed when we no longer
// guarantee backwards compatibility of master API to kubelets with
// versions less than 1.1.0
v, err := utilversion.ParseSemantic(node.Status.NodeInfo.KubeletVersion)
if err != nil {
glog.V(0).Infof("Couldn't parse version %q of node: %v", node.Status.NodeInfo.KubeletVersion, err)
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
return
}
if v.LessThan(gracefulDeletionVersion) {
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
return
}
}

View File

@ -39,6 +39,9 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/node/ipam"
"k8s.io/kubernetes/pkg/controller/node/scheduler"
"k8s.io/kubernetes/pkg/controller/node/util"
"k8s.io/kubernetes/pkg/controller/testutil"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/util/node"
@ -103,7 +106,7 @@ func NewNodeControllerFromClient(
serviceCIDR,
nodeCIDRMaskSize,
allocateNodeCIDRs,
RangeAllocatorType,
ipam.RangeAllocatorType,
useTaints,
useTaints,
)
@ -637,9 +640,9 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
zones := testutil.GetZones(item.fakeNodeHandler)
for _, zone := range zones {
if _, ok := nodeController.zonePodEvictor[zone]; ok {
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string)
deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetInformer.Lister())
util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetInformer.Lister())
return true, 0
})
} else {
@ -782,9 +785,9 @@ func TestPodStatusChange(t *testing.T) {
}
zones := testutil.GetZones(item.fakeNodeHandler)
for _, zone := range zones {
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string)
deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore)
util.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore)
return true, 0
})
}
@ -1337,9 +1340,9 @@ func (nc *nodeController) doEviction(fakeNodeHandler *testutil.FakeNodeHandler)
var podEvicted bool
zones := testutil.GetZones(fakeNodeHandler)
for _, zone := range zones {
nc.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
nc.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
uid, _ := value.UID.(string)
deletePods(fakeNodeHandler, nc.recorder, value.Value, uid, nc.daemonSetStore)
util.DeletePods(fakeNodeHandler, nc.recorder, value.Value, uid, nc.daemonSetStore)
return true, 0
})
}
@ -2310,7 +2313,7 @@ func TestCheckNodeKubeletVersionParsing(t *testing.T) {
},
},
}
isOutdated := nodeRunningOutdatedKubelet(n)
isOutdated := util.NodeRunningOutdatedKubelet(n)
if ov.outdated != isOutdated {
t.Errorf("Version %v doesn't match test expectation. Expected outdated %v got %v", n.Status.NodeInfo.KubeletVersion, ov.outdated, isOutdated)
} else {

View File

@ -0,0 +1,69 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = [
"rate_limited_queue_test.go",
"taint_controller_test.go",
"timed_workers_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/controller/testutil:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"rate_limited_queue.go",
"taint_controller.go",
"timed_workers.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api/helper:go_default_library",
"//pkg/api/v1/helper:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package scheduler
import (
"container/heap"
@ -27,6 +27,15 @@ import (
"github.com/golang/glog"
)
const (
// nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update.
NodeStatusUpdateRetry = 5
// controls how often NodeController will try to evict Pods from non-responsive Nodes.
NodeEvictionPeriod = 100 * time.Millisecond
// Burst value for all eviction rate limiters
EvictionRateLimiterBurst = 1
)
// TimedValue is a value that should be processed at a designated time.
type TimedValue struct {
Value string
@ -260,7 +269,7 @@ func (q *RateLimitedTimedQueue) SwapLimiter(newQPS float32) {
if newQPS <= 0 {
newLimiter = flowcontrol.NewFakeNeverRateLimiter()
} else {
newLimiter = flowcontrol.NewTokenBucketRateLimiter(newQPS, evictionRateLimiterBurst)
newLimiter = flowcontrol.NewTokenBucketRateLimiter(newQPS, EvictionRateLimiterBurst)
}
// If we're currently waiting on limiter, we drain the new one - this is a good approach when Burst value is 1
// TODO: figure out if we need to support higher Burst values and decide on the drain logic, should we keep:

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package scheduler
import (
"reflect"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package scheduler
import (
"fmt"
@ -122,7 +122,7 @@ func getPodsAssignedToNode(c clientset.Interface, nodeName string) ([]v1.Pod, er
time.Sleep(100 * time.Millisecond)
}
if err != nil {
return []v1.Pod{}, fmt.Errorf("Failed to get Pods assigned to node %v. Skipping update.", nodeName)
return []v1.Pod{}, fmt.Errorf("failed to get Pods assigned to node %v", nodeName)
}
return pods.Items, nil
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package scheduler
import (
"fmt"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package scheduler
import (
"sync"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package scheduler
import (
"sync"

View File

@ -0,0 +1,48 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["controller_utils.go"],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/version:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/listers/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -14,13 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package node
package util
import (
"errors"
"fmt"
"strings"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
@ -44,9 +45,17 @@ import (
"github.com/golang/glog"
)
// deletePods will delete all pods from master running on given node, and return true
var (
ErrCloudInstance = errors.New("cloud provider doesn't support instances")
// The minimum kubelet version for which the nodecontroller
// can safely flip pod.Status to NotReady.
podStatusReconciliationVersion = utilversion.MustParseSemantic("v1.2.0")
)
// DeletePods will delete all pods from master running on given node, and return true
// if any pods were deleted, or were found pending deletion.
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error) {
func DeletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error) {
remaining := false
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName).String()
options := metav1.ListOptions{FieldSelector: selector}
@ -58,7 +67,7 @@ func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, n
}
if len(pods.Items) > 0 {
recordNodeEvent(recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
RecordNodeEvent(recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
}
for _, pod := range pods.Items {
@ -68,8 +77,8 @@ func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, n
}
// Set reason and message in the pod object.
if _, err = setPodTerminationReason(kubeClient, &pod, nodeName); err != nil {
if errors.IsConflict(err) {
if _, err = SetPodTerminationReason(kubeClient, &pod, nodeName); err != nil {
if apierrors.IsConflict(err) {
updateErrList = append(updateErrList,
fmt.Errorf("update status failed for pod %q: %v", format.Pod(&pod), err))
continue
@ -100,9 +109,9 @@ func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, n
return remaining, nil
}
// setPodTerminationReason attempts to set a reason and message in the pod status, updates it in the apiserver,
// SetPodTerminationReason attempts to set a reason and message in the pod status, updates it in the apiserver,
// and returns an error if it encounters one.
func setPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeName string) (*v1.Pod, error) {
func SetPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeName string) (*v1.Pod, error) {
if pod.Status.Reason == nodepkg.NodeUnreachablePodReason {
return pod, nil
}
@ -118,7 +127,7 @@ func setPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeNa
return updatedPod, nil
}
func forcefullyDeletePod(c clientset.Interface, pod *v1.Pod) error {
func ForcefullyDeletePod(c clientset.Interface, pod *v1.Pod) error {
var zero int64
glog.Infof("NodeController is force deleting Pod: %v:%v", pod.Namespace, pod.Name)
err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &zero})
@ -128,75 +137,23 @@ func forcefullyDeletePod(c clientset.Interface, pod *v1.Pod) error {
return err
}
// forcefullyDeleteNode immediately the node. The pods on the node are cleaned
// ForcefullyDeleteNode immediately the node. The pods on the node are cleaned
// up by the podGC.
func forcefullyDeleteNode(kubeClient clientset.Interface, nodeName string) error {
func ForcefullyDeleteNode(kubeClient clientset.Interface, nodeName string) error {
if err := kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil {
return fmt.Errorf("unable to delete node %q: %v", nodeName, err)
}
return nil
}
// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating
// that should not be gracefully terminated.
func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a Pod %#v", obj)
return
}
}
// consider only terminating pods
if pod.DeletionTimestamp == nil {
return
}
node, err := nc.nodeLister.Get(pod.Spec.NodeName)
// if there is no such node, do nothing and let the podGC clean it up.
if errors.IsNotFound(err) {
return
}
if err != nil {
// this can only happen if the Store.KeyFunc has a problem creating
// a key for the pod. If it happens once, it will happen again so
// don't bother requeuing the pod.
utilruntime.HandleError(err)
return
}
// delete terminating pods that have been scheduled on
// nodes that do not support graceful termination
// TODO(mikedanese): this can be removed when we no longer
// guarantee backwards compatibility of master API to kubelets with
// versions less than 1.1.0
v, err := utilversion.ParseSemantic(node.Status.NodeInfo.KubeletVersion)
if err != nil {
glog.V(0).Infof("Couldn't parse version %q of node: %v", node.Status.NodeInfo.KubeletVersion, err)
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
return
}
if v.LessThan(gracefulDeletionVersion) {
utilruntime.HandleError(nc.forcefullyDeletePod(pod))
return
}
}
// update ready status of all pods running on given node from master
// return true if success
func markAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error {
func MarkAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error {
// Don't set pods to NotReady if the kubelet is running a version that
// doesn't understand how to correct readiness.
// TODO: Remove this check when we no longer guarantee backward compatibility
// with node versions < 1.2.0.
if nodeRunningOutdatedKubelet(node) {
if NodeRunningOutdatedKubelet(node) {
return nil
}
nodeName := node.Name
@ -233,11 +190,11 @@ func markAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error {
return fmt.Errorf("%v", strings.Join(errMsg, "; "))
}
// nodeRunningOutdatedKubelet returns true if the kubeletVersion reported
// NodeRunningOutdatedKubelet returns true if the kubeletVersion reported
// in the nodeInfo of the given node is "outdated", meaning < 1.2.0.
// Older versions were inflexible and modifying pod.Status directly through
// the apiserver would result in unexpected outcomes.
func nodeRunningOutdatedKubelet(node *v1.Node) bool {
func NodeRunningOutdatedKubelet(node *v1.Node) bool {
v, err := utilversion.ParseSemantic(node.Status.NodeInfo.KubeletVersion)
if err != nil {
glog.Errorf("couldn't parse version %q of node %v", node.Status.NodeInfo.KubeletVersion, err)
@ -250,7 +207,7 @@ func nodeRunningOutdatedKubelet(node *v1.Node) bool {
return false
}
func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeName) (bool, error) {
func NodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeName) (bool, error) {
instances, ok := cloud.Instances()
if !ok {
return false, fmt.Errorf("%v", ErrCloudInstance)
@ -264,7 +221,7 @@ func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.Nod
return true, nil
}
func recordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
ref := &v1.ObjectReference{
Kind: "Node",
Name: nodeName,
@ -275,7 +232,7 @@ func recordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype
recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
}
func recordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, new_status string) {
func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, new_status string) {
ref := &v1.ObjectReference{
Kind: "Node",
Name: node.Name,
@ -289,7 +246,7 @@ func recordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, new_st
}
// Returns true in case of success and false otherwise
func swapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintToRemove *v1.Taint, node *v1.Node) bool {
func SwapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintToRemove *v1.Taint, node *v1.Node) bool {
taintToAdd.TimeAdded = metav1.Now()
err := controller.AddOrUpdateTaintOnNode(kubeClient, node.Name, taintToAdd)
if err != nil {
@ -317,7 +274,7 @@ func swapNodeControllerTaint(kubeClient clientset.Interface, taintToAdd, taintTo
return true
}
func createAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
return func(originalObj interface{}) {
obj, err := scheme.Scheme.DeepCopy(originalObj)
if err != nil {
@ -332,7 +289,7 @@ func createAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
}
}
func createUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) {
func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) {
return func(origOldObj, origNewObj interface{}) {
oldObj, err := scheme.Scheme.DeepCopy(origOldObj)
if err != nil {
@ -353,7 +310,7 @@ func createUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldOb
}
}
func createDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
return func(originalObj interface{}) {
obj, err := scheme.Scheme.DeepCopy(originalObj)
if err != nil {