Fix TODO in pkg/controller/nodecontroller.go line 472

The code now calculates and find out the CIDRs for every node in any sync period.
I will fix this TODO by maintaining a set for available CIDRs left. Firstly, I will
insert 256 CIDRs into the available set. Once someone get one CIDR, remove this CIDR
from the available set. If one node get deleted, we will reinsert the CIDR associates
with this node back to available CIDR. Once there are nothing left in available CIDR set,
generate another 256 CIDRs and insert them into the available set. As a result, we do not
need to generate CIDRs in every monitor process and we only need to assign CIDR to node
which does not have it.

This commit also fix the error that CIDR may overflow when we use the function
generateCIDRs. There will be no more ip overflowing, all assigan CIDR will be valid
pull/6/head
Weixu Zhuang 2015-11-24 14:44:47 -08:00
parent 6aa3a74cf9
commit 3928bd6e76
1 changed files with 94 additions and 19 deletions

View File

@ -20,6 +20,8 @@ import (
"errors"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
@ -111,6 +113,12 @@ type NodeController struct {
nodeStore cache.StoreToNodeLister
forcefullyDeletePod func(*api.Pod)
availableCIDRs sets.Int
// Calculate the maximum num of CIDRs we could give out based on nc.clusterCIDR
// The flag denoting if the node controller is newly started or restarted(after crash)
needSync bool
maxCIDRs int
generatedCIDR bool
}
// NewNodeController returns a new node controller to sync instances from cloudprovider.
@ -158,6 +166,10 @@ func NewNodeController(
clusterCIDR: clusterCIDR,
allocateNodeCIDRs: allocateNodeCIDRs,
forcefullyDeletePod: func(p *api.Pod) { forcefullyDeletePod(kubeClient, p) },
availableCIDRs: make(sets.Int),
needSync: true,
maxCIDRs: 0,
generatedCIDR: false,
}
nc.podStore.Store, nc.podController = framework.NewInformer(
@ -192,10 +204,26 @@ func NewNodeController(
return nc
}
// generateAvailabeCIDRs does generating new Available CIDRs index and insert
// them into availableCIDRs set. Everytime it will generate 256 new CIDRs,
// once there are no more CIDRs in the network, return false
func (nc *NodeController) generateAvailableCIDRs() {
nc.generatedCIDR = true
// Generate all available CIDRs here, since there will not be manay
// available CIDRs. Set will be small, it will use less than 1MB memory
cidrIP := nc.clusterCIDR.IP.To4()
nc.maxCIDRs = (256-int(cidrIP[1]))*256 - int(cidrIP[2])
for i := 0; i <= nc.maxCIDRs; i++ {
nc.availableCIDRs.Insert(i)
}
}
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run(period time.Duration) {
go nc.nodeController.Run(util.NeverStop)
go nc.podController.Run(util.NeverStop)
// Incorporate the results of node status pushed from kubelet to master.
go util.Until(func() {
if err := nc.monitorNodeStatus(); err != nil {
@ -260,19 +288,34 @@ func (nc *NodeController) Run(period time.Duration) {
}, nodeEvictionPeriod, util.NeverStop)
}
// Generates num pod CIDRs that could be assigned to nodes.
func generateCIDRs(clusterCIDR *net.IPNet, num int) sets.String {
res := sets.NewString()
// translateCIDRs translates pod CIDR index to the CIDR that could be
// assigned to node. It will also check for overflow which make sure CIDR is valid
func translateCIDRs(clusterCIDR *net.IPNet, num int) string {
cidrIP := clusterCIDR.IP.To4()
for i := 0; i < num; i++ {
// TODO: Make the CIDRs configurable.
b1 := byte(i >> 8)
b2 := byte(i % 256)
res.Insert(fmt.Sprintf("%d.%d.%d.0/24", cidrIP[0], cidrIP[1]+b1, cidrIP[2]+b2))
// TODO: Make the CIDRs configurable.
b1 := (num / 256) + int(cidrIP[1])
b2 := (num % 256) + int(cidrIP[2])
if b2 > 255 {
b2 = b2 % 256
b1 = b1 + 1
}
res := fmt.Sprintf("%d.%d.%d.0/24", cidrIP[0], b1, b2)
return res
}
// translateCIDRtoIndex does translating CIDR to index of CIDR
func (nc *NodeController) translateCIDRtoIndex(freeCIDR string) int {
CIDRsArray := strings.Split(freeCIDR, ".")
if len(CIDRsArray) < 3 {
return -1
}
cidrIP := nc.clusterCIDR.IP.To4()
CIDRsIndexOne, _ := strconv.Atoi(CIDRsArray[1])
CIDRsIndexTwo, _ := strconv.Atoi(CIDRsArray[2])
release := (CIDRsIndexOne-int(cidrIP[1]))*256 + CIDRsIndexTwo - int(cidrIP[2])
return release
}
// getCondition returns a condition object for the specific condition
// type, nil if the condition is not set.
func (nc *NodeController) getCondition(status *api.NodeStatus, conditionType api.NodeConditionType) *api.NodeCondition {
@ -350,6 +393,17 @@ func forcefullyDeletePod(c client.Interface, pod *api.Pod) {
}
}
// releaseCIDR does translating CIDR back to CIDR index and insert this index
// back to availableCIDRs set
func (nc *NodeController) releaseCIDR(freeCIDR string) {
release := nc.translateCIDRtoIndex(freeCIDR)
if release >= 0 && release <= nc.maxCIDRs {
nc.availableCIDRs.Insert(release)
} else {
glog.V(4).Info("CIDR %s is invalid", freeCIDR)
}
}
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
// not reachable for a long period of time.
@ -460,11 +514,14 @@ func (nc *NodeController) monitorNodeStatus() error {
nc.evictPods(node.Name)
continue
}
if err := nc.kubeClient.Nodes().Delete(node.Name); err != nil {
assignedCIDR := node.Spec.PodCIDR
if err = nc.kubeClient.Nodes().Delete(node.Name); err != nil {
glog.Errorf("Unable to delete node %s: %v", node.Name, err)
continue
}
if assignedCIDR != "" {
nc.releaseCIDR(assignedCIDR)
}
}
}
}
@ -474,25 +531,43 @@ func (nc *NodeController) monitorNodeStatus() error {
// reconcileNodeCIDRs looks at each node and assigns it a valid CIDR
// if it doesn't currently have one.
// Examines the availableCIDRs set first, if no more CIDR in it, generate more.
func (nc *NodeController) reconcileNodeCIDRs(nodes *api.NodeList) {
glog.V(4).Infof("Reconciling cidrs for %d nodes", len(nodes.Items))
// TODO(roberthbailey): This seems inefficient. Why re-calculate CIDRs
// on each sync period?
availableCIDRs := generateCIDRs(nc.clusterCIDR, len(nodes.Items))
for _, node := range nodes.Items {
if node.Spec.PodCIDR != "" {
glog.V(4).Infof("CIDR %s is already being used by node %s", node.Spec.PodCIDR, node.Name)
availableCIDRs.Delete(node.Spec.PodCIDR)
// check if the this node controller is restarted because of crash
// this will only be ran once when the controller being restarted(because of crashed) or newly start
if nc.needSync {
// if it is crashed, restore the availableCIDRs by generating CIDRs, insert them into availableCIDRs set
// and delete assigned CIDRs from the the availableCIDRs set
for _, node := range nodes.Items {
if node.Spec.PodCIDR != "" {
if nc.availableCIDRs.Has(nc.translateCIDRtoIndex(node.Spec.PodCIDR)) {
nc.availableCIDRs.Delete(nc.translateCIDRtoIndex(node.Spec.PodCIDR))
} else {
glog.V(4).Info("Node %s CIDR error, its CIDR is invalid, will reassign CIDR", node.Name)
node.Spec.PodCIDR = ""
if _, err := nc.kubeClient.Nodes().Update(&node); err != nil {
nc.recordNodeStatusChange(&node, "CIDRAssignmentFailed")
}
break
}
}
}
nc.needSync = false
}
for _, node := range nodes.Items {
if node.Spec.PodCIDR == "" {
podCIDR, found := availableCIDRs.PopAny()
CIDRsNum, found := nc.availableCIDRs.PopAny()
if !found && !nc.generatedCIDR {
nc.generateAvailableCIDRs()
CIDRsNum, found = nc.availableCIDRs.PopAny()
}
if !found {
nc.recordNodeStatusChange(&node, "CIDRNotAvailable")
continue
}
glog.V(4).Infof("Assigning node %s CIDR %s", node.Name, podCIDR)
podCIDR := translateCIDRs(nc.clusterCIDR, CIDRsNum)
glog.V(4).Info("Assigning node %s CIDR %s", node.Name, podCIDR)
node.Spec.PodCIDR = podCIDR
if _, err := nc.kubeClient.Nodes().Update(&node); err != nil {
nc.recordNodeStatusChange(&node, "CIDRAssignmentFailed")