Merge pull request #19242 from mqliang/node-controller

Automatic merge from submit-queue

add CIDR allocator for NodeController

This PR:

* use pkg/controller/framework to watch nodes and	reduce lists when allocate CIDR for node
* decouple the cidr allocation logic from monitoring status logic

<!-- Reviewable:start -->
---
This change is [<img src="http://reviewable.k8s.io/review_button.svg" height="35" align="absmiddle" alt="Reviewable"/>](http://reviewable.k8s.io/reviews/kubernetes/kubernetes/19242)
<!-- Reviewable:end -->
pull/6/head
k8s-merge-robot 2016-05-20 09:45:05 -07:00
commit f935507235
16 changed files with 1119 additions and 456 deletions

View File

@ -483,6 +483,9 @@ start_kube_controller_manager() {
if [ -n "${CLUSTER_IP_RANGE:-}" ]; then
params="${params} --cluster-cidr=${CLUSTER_IP_RANGE}"
fi
if [ -n "${SERVICE_IP_RANGE:-}" ]; then
params="${params} --service-cluster-ip-range=${SERVICE_CLUSTER_IP_RANGE}"
fi
if [ "${ALLOCATE_NODE_CIDRS:-}" = "true" ]; then
params="${params} --allocate-node-cidrs=${ALLOCATE_NODE_CIDRS}"
fi

View File

@ -1,6 +1,7 @@
{% set cluster_name = "" -%}
{% set cluster_cidr = "" -%}
{% set allocate_node_cidrs = "" -%}
{% set service_cluster_ip_range = "" %}
{% set terminated_pod_gc = "" -%}
@ -10,6 +11,9 @@
{% if pillar['cluster_cidr'] is defined and pillar['cluster_cidr'] != "" -%}
{% set cluster_cidr = "--cluster-cidr=" + pillar['cluster_cidr'] -%}
{% endif -%}
{% if pillar['service_cluster_ip_range'] is defined and pillar['service_cluster_ip_range'] != "" -%}
{% set service_cluster_ip_range = "--service_cluster_ip_range=" + pillar['service_cluster_ip_range'] -%}
{% endif -%}
# When we're using flannel it is responsible for cidr allocation.
# This is expected to be a short-term compromise.
{% if pillar.get('network_provider', '').lower() == 'flannel' %}
@ -59,7 +63,7 @@
{% set log_level = pillar['controller_manager_test_log_level'] -%}
{% endif -%}
{% set params = "--master=127.0.0.1:8080" + " " + cluster_name + " " + cluster_cidr + " " + allocate_node_cidrs + " " + terminated_pod_gc + " " + cloud_provider + " " + cloud_config + " " + service_account_key + " " + log_level + " " + root_ca_file -%}
{% set params = "--master=127.0.0.1:8080" + " " + cluster_name + " " + cluster_cidr + " " + allocate_node_cidrs + " " + service_cluster_ip_range + " " + terminated_pod_gc + " " + cloud_provider + " " + cloud_config + " " + service_account_key + " " + log_level + " " + root_ca_file -%}
# test_args has to be kept at the end, so they'll overwrite any prior configuration

View File

@ -206,7 +206,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
go podInformer.Run(wait.NeverStop)
nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
40*time.Second, 60*time.Second, 5*time.Second, nil, nil, 0, false)
nodeController.Run(5 * time.Second)
cadvisorInterface := new(cadvisortest.Fake)

View File

@ -227,10 +227,11 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
// this cidr has been validated already
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
_, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR)
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, s.AllocateNodeCIDRs)
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

View File

@ -70,6 +70,7 @@ func NewCMServer() *CMServer {
NodeStartupGracePeriod: unversioned.Duration{Duration: 60 * time.Second},
NodeMonitorPeriod: unversioned.Duration{Duration: 5 * time.Second},
ClusterName: "kubernetes",
NodeCIDRMaskSize: 24,
TerminatedPodGCThreshold: 12500,
VolumeConfiguration: componentconfig.VolumeConfiguration{
EnableHostPathProvisioning: false,
@ -141,6 +142,8 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
fs.StringVar(&s.ClusterName, "cluster-name", s.ClusterName, "The instance prefix for the cluster")
fs.StringVar(&s.ClusterCIDR, "cluster-cidr", s.ClusterCIDR, "CIDR Range for Pods in cluster.")
fs.StringVar(&s.ServiceCIDR, "service-cluster-ip-range", s.ServiceCIDR, "CIDR Range for Services in cluster.")
fs.Int32Var(&s.NodeCIDRMaskSize, "node-cidr-mask-size", s.NodeCIDRMaskSize, "Mask size for node cidr in cluster.")
fs.BoolVar(&s.AllocateNodeCIDRs, "allocate-node-cidrs", false, "Should CIDRs for Pods be allocated and set on the cloud provider.")
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")

View File

@ -153,10 +153,11 @@ func (s *CMServer) Run(_ []string) error {
glog.Fatalf("Cloud provider could not be initialized: %v", err)
}
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
_, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR)
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, s.AllocateNodeCIDRs)
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration)
nodeStatusUpdaterController := node.NewStatusUpdater(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-status-controller")), s.NodeMonitorPeriod.Duration, time.Now)

View File

@ -87,6 +87,7 @@ kube-controller-manager
--master="": The address of the Kubernetes API server (overrides any value in kubeconfig)
--min-resync-period=12h0m0s: The resync period in reflectors will be random between MinResyncPeriod and 2*MinResyncPeriod
--namespace-sync-period=5m0s: The period for syncing namespace life-cycle updates
--node-cidr-mask-size=24: Mask size for node cidr in cluster.
--node-monitor-grace-period=40s: Amount of time which we allow running Node to be unresponsive before marking it unhealty. Must be N times more than kubelet's nodeStatusUpdateFrequency, where N means number of retries allowed for kubelet to post node status.
--node-monitor-period=5s: The period for syncing NodeStatus in NodeController.
--node-startup-grace-period=1m0s: Amount of time which we allow starting Node to be unresponsive before marking it unhealty.
@ -106,11 +107,12 @@ kube-controller-manager
--resource-quota-sync-period=5m0s: The period for syncing quota usage status in the system
--root-ca-file="": If set, this root certificate authority will be included in service account's token secret. This must be a valid PEM-encoded CA bundle.
--service-account-private-key-file="": Filename containing a PEM-encoded private RSA key used to sign service account tokens.
--service-cluster-ip-range="": CIDR Range for Services in cluster.
--service-sync-period=5m0s: The period for syncing services with their external load balancers
--terminated-pod-gc-threshold=12500: Number of terminated pods that can exist before the terminated pod garbage collector starts deleting terminated pods. If <= 0, the terminated pod garbage collector is disabled.
```
###### Auto generated by spf13/cobra on 21-Apr-2016
###### Auto generated by spf13/cobra on 17-May-2016
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->

View File

@ -34,7 +34,7 @@ cluster/photon-controller/util.sh: node_name=${1}
cluster/rackspace/util.sh: local node_ip=$(nova show --minimal ${NODE_NAMES[$i]} \
cluster/saltbase/salt/kube-admission-controls/init.sls:{% if 'LimitRanger' in pillar.get('admission_control', '') %}
cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest:{% set params = address + " " + etcd_servers + " " + etcd_servers_overrides + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + basic_auth_file + " " + min_request_timeout -%}
cluster/saltbase/salt/kube-controller-manager/kube-controller-manager.manifest:{% set params = "--master=127.0.0.1:8080" + " " + cluster_name + " " + cluster_cidr + " " + allocate_node_cidrs + " " + terminated_pod_gc + " " + cloud_provider + " " + cloud_config + " " + service_account_key + " " + log_level + " " + root_ca_file -%}
cluster/saltbase/salt/kube-controller-manager/kube-controller-manager.manifest:{% set params = "--master=127.0.0.1:8080" + " " + cluster_name + " " + cluster_cidr + " " + allocate_node_cidrs + " " + service_cluster_ip_range + " " + terminated_pod_gc + " " + cloud_provider + " " + cloud_config + " " + service_account_key + " " + log_level + " " + root_ca_file -%}
cluster/saltbase/salt/kube-proxy/kube-proxy.manifest: {% set api_servers_with_port = api_servers + ":6443" -%}
cluster/saltbase/salt/kube-proxy/kube-proxy.manifest: {% set api_servers_with_port = api_servers -%}
cluster/saltbase/salt/kube-proxy/kube-proxy.manifest: {% set cluster_cidr=" --cluster-cidr=" + pillar['cluster_cidr'] %}

View File

@ -287,6 +287,7 @@ network-plugin
network-plugin-dir
no-headers
no-suggestions
node-cidr-mask-size
node-instance-group
node-ip
node-labels

View File

@ -117,6 +117,8 @@ func DeepCopy_componentconfig_KubeControllerManagerConfiguration(in KubeControll
out.EnableProfiling = in.EnableProfiling
out.ClusterName = in.ClusterName
out.ClusterCIDR = in.ClusterCIDR
out.ServiceCIDR = in.ServiceCIDR
out.NodeCIDRMaskSize = in.NodeCIDRMaskSize
out.AllocateNodeCIDRs = in.AllocateNodeCIDRs
out.RootCAFile = in.RootCAFile
out.ContentType = in.ContentType

File diff suppressed because it is too large Load Diff

View File

@ -522,6 +522,10 @@ type KubeControllerManagerConfiguration struct {
ClusterName string `json:"clusterName"`
// clusterCIDR is CIDR Range for Pods in cluster.
ClusterCIDR string `json:"clusterCIDR"`
// serviceCIDR is CIDR Range for Services in cluster.
ServiceCIDR string `json:"serviceCIDR"`
// NodeCIDRMaskSize is the mask size for node cidr in cluster.
NodeCIDRMaskSize int32 `json:"nodeCIDRMaskSize"`
// allocateNodeCIDRs enables CIDRs for Pods to be allocated and set on the
// cloud provider.
AllocateNodeCIDRs bool `json:"allocateNodeCIDRs"`

View File

@ -0,0 +1,157 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"encoding/binary"
"errors"
"fmt"
"math/big"
"net"
"sync"
)
var errCIDRRangeNoCIDRsRemaining = errors.New("CIDR allocation failed; there are no remaining CIDRs left to allocate in the accepted range")
// CIDRAllocator is an interface implemented by things that know how to allocate/occupy/recycle CIDR for nodes.
type CIDRAllocator interface {
AllocateNext() (*net.IPNet, error)
Occupy(*net.IPNet) error
Release(*net.IPNet) error
}
type rangeAllocator struct {
clusterCIDR *net.IPNet
clusterIP net.IP
clusterMaskSize int
subNetMaskSize int
maxCIDRs int
used big.Int
lock sync.Mutex
}
// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
func NewCIDRRangeAllocator(clusterCIDR *net.IPNet, subNetMaskSize int) CIDRAllocator {
clusterMask := clusterCIDR.Mask
clusterMaskSize, _ := clusterMask.Size()
ra := &rangeAllocator{
clusterCIDR: clusterCIDR,
clusterIP: clusterCIDR.IP.To4(),
clusterMaskSize: clusterMaskSize,
subNetMaskSize: subNetMaskSize,
maxCIDRs: 1 << uint32(subNetMaskSize-clusterMaskSize),
}
return ra
}
func (r *rangeAllocator) AllocateNext() (*net.IPNet, error) {
r.lock.Lock()
defer r.lock.Unlock()
nextUnused := -1
for i := 0; i < r.maxCIDRs; i++ {
if r.used.Bit(i) == 0 {
nextUnused = i
break
}
}
if nextUnused == -1 {
return nil, errCIDRRangeNoCIDRsRemaining
}
r.used.SetBit(&r.used, nextUnused, 1)
j := uint32(nextUnused) << uint32(32-r.subNetMaskSize)
ipInt := (binary.BigEndian.Uint32(r.clusterIP)) | j
ip := make([]byte, 4)
binary.BigEndian.PutUint32(ip, ipInt)
return &net.IPNet{
IP: ip,
Mask: net.CIDRMask(r.subNetMaskSize, 32),
}, nil
}
func (r *rangeAllocator) Release(cidr *net.IPNet) error {
used, err := r.getIndexForCIDR(cidr)
if err != nil {
return err
}
r.lock.Lock()
defer r.lock.Unlock()
r.used.SetBit(&r.used, used, 0)
return nil
}
func (r *rangeAllocator) MaxCIDRs() int {
return r.maxCIDRs
}
func (r *rangeAllocator) Occupy(cidr *net.IPNet) (err error) {
begin, end := 0, r.maxCIDRs
cidrMask := cidr.Mask
maskSize, _ := cidrMask.Size()
if !r.clusterCIDR.Contains(cidr.IP.Mask(r.clusterCIDR.Mask)) && !cidr.Contains(r.clusterCIDR.IP.Mask(cidr.Mask)) {
return fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, r.clusterCIDR)
}
if r.clusterMaskSize < maskSize {
subNetMask := net.CIDRMask(r.subNetMaskSize, 32)
begin, err = r.getIndexForCIDR(&net.IPNet{
IP: cidr.IP.To4().Mask(subNetMask),
Mask: subNetMask,
})
if err != nil {
return err
}
ip := make([]byte, 4)
ipInt := binary.BigEndian.Uint32(cidr.IP) | (^binary.BigEndian.Uint32(cidr.Mask))
binary.BigEndian.PutUint32(ip, ipInt)
end, err = r.getIndexForCIDR(&net.IPNet{
IP: net.IP(ip).To4().Mask(subNetMask),
Mask: subNetMask,
})
if err != nil {
return err
}
}
r.lock.Lock()
defer r.lock.Unlock()
for i := begin; i <= end; i++ {
r.used.SetBit(&r.used, i, 1)
}
return nil
}
func (r *rangeAllocator) getIndexForCIDR(cidr *net.IPNet) (int, error) {
cidrIndex := (binary.BigEndian.Uint32(r.clusterIP) ^ binary.BigEndian.Uint32(cidr.IP.To4())) >> uint32(32-r.subNetMaskSize)
if cidrIndex >= uint32(r.maxCIDRs) {
return 0, fmt.Errorf("CIDR: %v is out of the range of CIDR allocator", cidr)
}
return int(cidrIndex), nil
}

View File

@ -0,0 +1,350 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"github.com/golang/glog"
"math/big"
"net"
"reflect"
"testing"
)
func TestRangeAllocatorFullyAllocated(t *testing.T) {
_, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/30")
a := NewCIDRRangeAllocator(clusterCIDR, 30)
p, err := a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if p.String() != "127.123.234.0/30" {
t.Fatalf("unexpected allocated cidr: %s", p.String())
}
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range")
}
a.Release(p)
p, err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if p.String() != "127.123.234.0/30" {
t.Fatalf("unexpected allocated cidr: %s", p.String())
}
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range")
}
}
func TestRangeAllocator_RandomishAllocation(t *testing.T) {
_, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/16")
a := NewCIDRRangeAllocator(clusterCIDR, 24)
// allocate all the CIDRs
var err error
cidrs := make([]*net.IPNet, 256)
for i := 0; i < 256; i++ {
cidrs[i], err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range")
}
// release them all
for i := 0; i < 256; i++ {
a.Release(cidrs[i])
}
// allocate the CIDRs again
rcidrs := make([]*net.IPNet, 256)
for i := 0; i < 256; i++ {
rcidrs[i], err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %d, %v", i, err)
}
}
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range")
}
if !reflect.DeepEqual(cidrs, rcidrs) {
t.Fatalf("expected re-allocated cidrs are the same collection")
}
}
func TestRangeAllocator_AllocationOccupied(t *testing.T) {
_, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/16")
a := NewCIDRRangeAllocator(clusterCIDR, 24)
// allocate all the CIDRs
var err error
cidrs := make([]*net.IPNet, 256)
for i := 0; i < 256; i++ {
cidrs[i], err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range")
}
// release them all
for i := 0; i < 256; i++ {
a.Release(cidrs[i])
}
// occupy the last 128 CIDRs
for i := 128; i < 256; i++ {
a.Occupy(cidrs[i])
}
// allocate the first 128 CIDRs again
rcidrs := make([]*net.IPNet, 128)
for i := 0; i < 128; i++ {
rcidrs[i], err = a.AllocateNext()
if err != nil {
t.Fatalf("unexpected error: %d, %v", i, err)
}
}
_, err = a.AllocateNext()
if err == nil {
t.Fatalf("expected error because of fully-allocated range")
}
// check Occupy() work properly
for i := 128; i < 256; i++ {
rcidrs = append(rcidrs, cidrs[i])
}
if !reflect.DeepEqual(cidrs, rcidrs) {
t.Fatalf("expected re-allocated cidrs are the same collection")
}
}
func TestGetBitforCIDR(t *testing.T) {
cases := []struct {
clusterCIDRStr string
subNetMaskSize int
subNetCIDRStr string
expectedBit int
expectErr bool
}{
{
clusterCIDRStr: "127.0.0.0/8",
subNetMaskSize: 16,
subNetCIDRStr: "127.0.0.0/16",
expectedBit: 0,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/8",
subNetMaskSize: 16,
subNetCIDRStr: "127.123.0.0/16",
expectedBit: 123,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/8",
subNetMaskSize: 16,
subNetCIDRStr: "127.168.0.0/16",
expectedBit: 168,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/8",
subNetMaskSize: 16,
subNetCIDRStr: "127.224.0.0/16",
expectedBit: 224,
expectErr: false,
},
{
clusterCIDRStr: "192.168.0.0/16",
subNetMaskSize: 24,
subNetCIDRStr: "192.168.12.0/24",
expectedBit: 12,
expectErr: false,
},
{
clusterCIDRStr: "192.168.0.0/16",
subNetMaskSize: 24,
subNetCIDRStr: "192.168.151.0/24",
expectedBit: 151,
expectErr: false,
},
{
clusterCIDRStr: "192.168.0.0/16",
subNetMaskSize: 24,
subNetCIDRStr: "127.168.224.0/24",
expectErr: true,
},
}
for _, tc := range cases {
_, clusterCIDR, err := net.ParseCIDR(tc.clusterCIDRStr)
clusterMask := clusterCIDR.Mask
clusterMaskSize, _ := clusterMask.Size()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
ra := &rangeAllocator{
clusterIP: clusterCIDR.IP.To4(),
clusterMaskSize: clusterMaskSize,
subNetMaskSize: tc.subNetMaskSize,
maxCIDRs: 1 << uint32(tc.subNetMaskSize-clusterMaskSize),
}
_, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
got, err := ra.getIndexForCIDR(subnetCIDR)
if err == nil && tc.expectErr {
glog.Errorf("expected error but got null")
continue
}
if err != nil && !tc.expectErr {
glog.Errorf("unexpected error: %v", err)
continue
}
if got != tc.expectedBit {
glog.Errorf("expected %v, but got %v", tc.expectedBit, got)
}
}
}
func TestOccupy(t *testing.T) {
cases := []struct {
clusterCIDRStr string
subNetMaskSize int
subNetCIDRStr string
expectedUsedBegin int
expectedUsedEnd int
expectErr bool
}{
{
clusterCIDRStr: "127.0.0.0/8",
subNetMaskSize: 16,
subNetCIDRStr: "127.0.0.0/8",
expectedUsedBegin: 0,
expectedUsedEnd: 256,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/8",
subNetMaskSize: 16,
subNetCIDRStr: "127.0.0.0/2",
expectedUsedBegin: 0,
expectedUsedEnd: 256,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/8",
subNetMaskSize: 16,
subNetCIDRStr: "127.0.0.0/16",
expectedUsedBegin: 0,
expectedUsedEnd: 0,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/8",
subNetMaskSize: 32,
subNetCIDRStr: "127.0.0.0/16",
expectedUsedBegin: 0,
expectedUsedEnd: 65535,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/7",
subNetMaskSize: 16,
subNetCIDRStr: "127.0.0.0/15",
expectedUsedBegin: 256,
expectedUsedEnd: 257,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/7",
subNetMaskSize: 15,
subNetCIDRStr: "127.0.0.0/15",
expectedUsedBegin: 128,
expectedUsedEnd: 128,
expectErr: false,
},
{
clusterCIDRStr: "127.0.0.0/7",
subNetMaskSize: 18,
subNetCIDRStr: "127.0.0.0/15",
expectedUsedBegin: 1024,
expectedUsedEnd: 1031,
expectErr: false,
},
}
for _, tc := range cases {
_, clusterCIDR, err := net.ParseCIDR(tc.clusterCIDRStr)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
clusterMask := clusterCIDR.Mask
clusterMaskSize, _ := clusterMask.Size()
ra := &rangeAllocator{
clusterCIDR: clusterCIDR,
clusterIP: clusterCIDR.IP.To4(),
clusterMaskSize: clusterMaskSize,
subNetMaskSize: tc.subNetMaskSize,
maxCIDRs: 1 << uint32(tc.subNetMaskSize-clusterMaskSize),
}
_, subnetCIDR, err := net.ParseCIDR(tc.subNetCIDRStr)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
err = ra.Occupy(subnetCIDR)
if err == nil && tc.expectErr {
t.Errorf("expected error but got none")
continue
}
if err != nil && !tc.expectErr {
t.Errorf("unexpected error: %v", err)
continue
}
expectedUsed := big.Int{}
for i := tc.expectedUsedBegin; i <= tc.expectedUsedEnd; i++ {
expectedUsed.SetBit(&expectedUsed, i, 1)
}
if expectedUsed.Cmp(&ra.used) != 0 {
t.Errorf("error")
}
}
}

View File

@ -57,6 +57,8 @@ var (
const (
// nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update.
nodeStatusUpdateRetry = 5
// podCIDRUpdateRetry controls the number of retries of writing Node.Spec.PodCIDR update.
podCIDRUpdateRetry = 5
// controls how often NodeController will try to evict Pods from non-responsive Nodes.
nodeEvictionPeriod = 100 * time.Millisecond
)
@ -71,6 +73,7 @@ type NodeController struct {
allocateNodeCIDRs bool
cloud cloudprovider.Interface
clusterCIDR *net.IPNet
serviceCIDR *net.IPNet
deletingPodsRateLimiter flowcontrol.RateLimiter
knownNodeSet sets.String
kubeClient clientset.Interface
@ -121,6 +124,8 @@ type NodeController struct {
// DaemonSet framework and store
daemonSetController *framework.Controller
daemonSetStore cache.StoreToDaemonSetLister
// allocate/recycle CIDRs for node if allocateNodeCIDRs == true
cidrAllocator CIDRAllocator
forcefullyDeletePod func(*api.Pod) error
nodeExistsInCloudProvider func(string) (bool, error)
@ -142,6 +147,8 @@ func NewNodeController(
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
clusterCIDR *net.IPNet,
serviceCIDR *net.IPNet,
nodeCIDRMaskSize int,
allocateNodeCIDRs bool) *NodeController {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
@ -157,8 +164,14 @@ func NewNodeController(
metrics.RegisterMetricAndTrackRateLimiterUsage("node_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
if allocateNodeCIDRs && clusterCIDR == nil {
glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.")
if allocateNodeCIDRs {
if clusterCIDR == nil {
glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.")
}
mask := clusterCIDR.Mask
if maskSize, _ := mask.Size(); maskSize > nodeCIDRMaskSize {
glog.Fatal("NodeController: Invalid clusterCIDR, mask size of clusterCIDR must be less than nodeCIDRMaskSize.")
}
}
evictorLock := sync.Mutex{}
@ -179,6 +192,7 @@ func NewNodeController(
lookupIP: net.LookupIP,
now: unversioned.Now,
clusterCIDR: clusterCIDR,
serviceCIDR: serviceCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
@ -204,6 +218,15 @@ func NewNodeController(
// they'll get the benefits they expect. It will also reserve the name for future refactorings.
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
nodeEventHandlerFuncs := framework.ResourceEventHandlerFuncs{}
if nc.allocateNodeCIDRs {
nodeEventHandlerFuncs = framework.ResourceEventHandlerFuncs{
AddFunc: nc.allocateOrOccupyCIDR,
DeleteFunc: nc.recycleCIDR,
}
}
nc.nodeStore.Store, nc.nodeController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
@ -215,8 +238,9 @@ func NewNodeController(
},
&api.Node{},
controller.NoResyncPeriodFunc(),
framework.ResourceEventHandlerFuncs{},
nodeEventHandlerFuncs,
)
nc.daemonSetStore.Store, nc.daemonSetController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
@ -230,11 +254,20 @@ func NewNodeController(
controller.NoResyncPeriodFunc(),
framework.ResourceEventHandlerFuncs{},
)
if allocateNodeCIDRs {
nc.cidrAllocator = NewCIDRRangeAllocator(clusterCIDR, nodeCIDRMaskSize)
}
return nc
}
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run(period time.Duration) {
if nc.allocateNodeCIDRs {
nc.filterOutServiceRange()
}
go nc.nodeController.Run(wait.NeverStop)
go nc.podController.Run(wait.NeverStop)
go nc.daemonSetController.Run(wait.NeverStop)
@ -305,17 +338,78 @@ func (nc *NodeController) Run(period time.Duration) {
go wait.Until(nc.cleanupOrphanedPods, 30*time.Second, wait.NeverStop)
}
// Generates num pod CIDRs that could be assigned to nodes.
func generateCIDRs(clusterCIDR *net.IPNet, num int) sets.String {
res := sets.NewString()
cidrIP := clusterCIDR.IP.To4()
for i := 0; i < num; i++ {
// TODO: Make the CIDRs configurable.
b1 := byte(i >> 8)
b2 := byte(i % 256)
res.Insert(fmt.Sprintf("%d.%d.%d.0/24", cidrIP[0], cidrIP[1]+b1, cidrIP[2]+b2))
func (nc *NodeController) filterOutServiceRange() {
if !nc.clusterCIDR.Contains(nc.serviceCIDR.IP.Mask(nc.clusterCIDR.Mask)) && !nc.serviceCIDR.Contains(nc.clusterCIDR.IP.Mask(nc.serviceCIDR.Mask)) {
return
}
if err := nc.cidrAllocator.Occupy(nc.serviceCIDR); err != nil {
glog.Errorf("Error filtering out service cidr: %v", err)
}
}
// allocateOrOccupyCIDR looks at each new observed node, assigns it a valid CIDR
// if it doesn't currently have one or mark the CIDR as used if the node already have one.
func (nc *NodeController) allocateOrOccupyCIDR(obj interface{}) {
node := obj.(*api.Node)
if node.Spec.PodCIDR != "" {
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
if err != nil {
glog.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
return
}
if err := nc.cidrAllocator.Occupy(podCIDR); err != nil {
glog.Errorf("failed to mark cidr as occupied :%v", err)
return
}
return
}
podCIDR, err := nc.cidrAllocator.AllocateNext()
if err != nil {
nc.recordNodeStatusChange(node, "CIDRNotAvailable")
return
}
glog.V(4).Infof("Assigning node %s CIDR %s", node.Name, podCIDR)
for rep := 0; rep < podCIDRUpdateRetry; rep++ {
node.Spec.PodCIDR = podCIDR.String()
if _, err := nc.kubeClient.Core().Nodes().Update(node); err == nil {
glog.Errorf("Failed while updating Node.Spec.PodCIDR : %v", err)
break
}
node, err = nc.kubeClient.Core().Nodes().Get(node.Name)
if err != nil {
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", node.Name, err)
break
}
}
if err != nil {
glog.Errorf("Update PodCIDR of node %v from NodeController exceeds retry count.", node.Name)
nc.recordNodeStatusChange(node, "CIDRAssignmentFailed")
glog.Errorf("CIDR assignment for node %v failed: %v", node.Name, err)
}
}
// recycleCIDR recycles the CIDR of a removed node
func (nc *NodeController) recycleCIDR(obj interface{}) {
node := obj.(*api.Node)
if node.Spec.PodCIDR == "" {
return
}
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
if err != nil {
glog.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
return
}
glog.V(4).Infof("recycle node %s CIDR %s", node.Name, podCIDR)
if err := nc.cidrAllocator.Release(podCIDR); err != nil {
glog.Errorf("failed to release cidr: %v", err)
}
return res
}
// getCondition returns a condition object for the specific condition
@ -450,11 +544,6 @@ func (nc *NodeController) monitorNodeStatus() error {
}
}
if nc.allocateNodeCIDRs {
// TODO (cjcullen): Use pkg/controller/framework to watch nodes and
// reduce lists/decouple this from monitoring status.
nc.reconcileNodeCIDRs(nodes)
}
seenReady := false
for i := range nodes.Items {
var gracePeriod time.Duration
@ -590,42 +679,6 @@ func (nc *NodeController) forcefullyDeleteNode(nodeName string) error {
return nil
}
// reconcileNodeCIDRs looks at each node and assigns it a valid CIDR
// if it doesn't currently have one.
func (nc *NodeController) reconcileNodeCIDRs(nodes *api.NodeList) {
glog.V(4).Infof("Reconciling cidrs for %d nodes", len(nodes.Items))
// TODO(roberthbailey): This seems inefficient. Why re-calculate CIDRs
// on each sync period?
availableCIDRs := generateCIDRs(nc.clusterCIDR, len(nodes.Items))
for _, node := range nodes.Items {
if node.Spec.PodCIDR != "" {
glog.V(4).Infof("CIDR %s is already being used by node %s", node.Spec.PodCIDR, node.Name)
availableCIDRs.Delete(node.Spec.PodCIDR)
}
}
for _, node := range nodes.Items {
if node.Spec.PodCIDR == "" {
// Re-GET node (because ours might be stale by now).
n, err := nc.kubeClient.Core().Nodes().Get(node.Name)
if err != nil {
glog.Errorf("Failed to get node %q: %v", node.Name, err)
continue
}
podCIDR, found := availableCIDRs.PopAny()
if !found {
nc.recordNodeStatusChange(n, "CIDRNotAvailable")
continue
}
glog.V(1).Infof("Assigning node %s CIDR %s", n.Name, podCIDR)
n.Spec.PodCIDR = podCIDR
if _, err := nc.kubeClient.Core().Nodes().Update(n); err != nil {
nc.recordNodeStatusChange(&node, "CIDRAssignmentFailed")
}
}
}
}
func (nc *NodeController) recordNodeEvent(nodeName, eventtype, reason, event string) {
ref := &api.ObjectReference{
Kind: "Node",

View File

@ -660,7 +660,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler,
evictionTimeout, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
for _, ds := range item.daemonSets {
nodeController.daemonSetStore.Add(&ds)
@ -731,7 +731,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
nodeController := NewNodeController(nil, fnh, 10*time.Minute,
flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod,
testNodeMonitorPeriod, nil, false)
testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.cloud = &fakecloud.FakeCloud{}
nodeController.now = func() unversioned.Time { return unversioned.Date(2016, 1, 1, 12, 0, 0, 0, time.UTC) }
nodeController.nodeExistsInCloudProvider = func(nodeName string) (bool, error) {
@ -963,7 +963,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(),
flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
@ -1113,7 +1113,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
for i, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(),
flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
flowcontrol.NewFakeAlwaysRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("Case[%d] unexpected error: %v", i, err)
@ -1195,7 +1195,7 @@ func TestNodeDeletion(t *testing.T) {
}
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() unversioned.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
@ -1298,7 +1298,7 @@ func TestCheckPod(t *testing.T) {
},
}
nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, false)
nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, nil, 0, false)
nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
nc.nodeStore.Store.Add(&api.Node{
ObjectMeta: api.ObjectMeta{
@ -1375,7 +1375,7 @@ func TestCleanupOrphanedPods(t *testing.T) {
newPod("b", "bar"),
newPod("c", "gone"),
}
nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, false)
nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, nil, 0, false)
nc.nodeStore.Store.Add(newNode("foo"))
nc.nodeStore.Store.Add(newNode("bar"))