mirror of https://github.com/k3s-io/k3s
327 lines
11 KiB
Go
327 lines
11 KiB
Go
/*
|
|
Copyright 2016 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package ipam
|
|
|
|
import (
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
|
|
"github.com/golang/glog"
|
|
|
|
"k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
informers "k8s.io/client-go/informers/core/v1"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/kubernetes/scheme"
|
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
corelisters "k8s.io/client-go/listers/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/tools/record"
|
|
"k8s.io/kubernetes/pkg/controller"
|
|
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
|
|
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
|
|
utilnode "k8s.io/kubernetes/pkg/util/node"
|
|
)
|
|
|
|
type rangeAllocator struct {
|
|
client clientset.Interface
|
|
cidrs *cidrset.CidrSet
|
|
clusterCIDR *net.IPNet
|
|
maxCIDRs int
|
|
|
|
// nodeLister is able to list/get nodes and is populated by the shared informer passed to
|
|
// NewCloudCIDRAllocator.
|
|
nodeLister corelisters.NodeLister
|
|
// nodesSynced returns true if the node shared informer has been synced at least once.
|
|
nodesSynced cache.InformerSynced
|
|
|
|
// Channel that is used to pass updating Nodes with assigned CIDRs to the background
|
|
// This increases a throughput of CIDR assignment by not blocking on long operations.
|
|
nodeCIDRUpdateChannel chan nodeAndCIDR
|
|
recorder record.EventRecorder
|
|
|
|
// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
|
|
lock sync.Mutex
|
|
nodesInProcessing sets.String
|
|
}
|
|
|
|
// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
|
|
// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
|
|
// Caller must always pass in a list of existing nodes so the new allocator
|
|
// can initialize its CIDR map. NodeList is only nil in testing.
|
|
func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.NodeInformer, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *v1.NodeList) (CIDRAllocator, error) {
|
|
if client == nil {
|
|
glog.Fatalf("kubeClient is nil when starting NodeController")
|
|
}
|
|
|
|
eventBroadcaster := record.NewBroadcaster()
|
|
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
|
|
eventBroadcaster.StartLogging(glog.Infof)
|
|
glog.V(0).Infof("Sending events to api server.")
|
|
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
|
|
|
|
set, err := cidrset.NewCIDRSet(clusterCIDR, subNetMaskSize)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ra := &rangeAllocator{
|
|
client: client,
|
|
cidrs: set,
|
|
clusterCIDR: clusterCIDR,
|
|
nodeLister: nodeInformer.Lister(),
|
|
nodesSynced: nodeInformer.Informer().HasSynced,
|
|
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
|
|
recorder: recorder,
|
|
nodesInProcessing: sets.NewString(),
|
|
}
|
|
|
|
if serviceCIDR != nil {
|
|
ra.filterOutServiceRange(serviceCIDR)
|
|
} else {
|
|
glog.V(0).Info("No Service CIDR provided. Skipping filtering out service addresses.")
|
|
}
|
|
|
|
if nodeList != nil {
|
|
for _, node := range nodeList.Items {
|
|
if node.Spec.PodCIDR == "" {
|
|
glog.Infof("Node %v has no CIDR, ignoring", node.Name)
|
|
continue
|
|
} else {
|
|
glog.Infof("Node %v has CIDR %s, occupying it in CIDR map",
|
|
node.Name, node.Spec.PodCIDR)
|
|
}
|
|
if err := ra.occupyCIDR(&node); err != nil {
|
|
// This will happen if:
|
|
// 1. We find garbage in the podCIDR field. Retrying is useless.
|
|
// 2. CIDR out of range: This means a node CIDR has changed.
|
|
// This error will keep crashing controller-manager.
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: nodeutil.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR),
|
|
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
|
|
// If the PodCIDR is not empty we either:
|
|
// - already processed a Node that already had a CIDR after NC restarted
|
|
// (cidr is marked as used),
|
|
// - already processed a Node successfully and allocated a CIDR for it
|
|
// (cidr is marked as used),
|
|
// - already processed a Node but we did saw a "timeout" response and
|
|
// request eventually got through in this case we haven't released
|
|
// the allocated CIDR (cidr is still marked as used).
|
|
// There's a possible error here:
|
|
// - NC sees a new Node and assigns a CIDR X to it,
|
|
// - Update Node call fails with a timeout,
|
|
// - Node is updated by some other component, NC sees an update and
|
|
// assigns CIDR Y to the Node,
|
|
// - Both CIDR X and CIDR Y are marked as used in the local cache,
|
|
// even though Node sees only CIDR Y
|
|
// The problem here is that in in-memory cache we see CIDR X as marked,
|
|
// which prevents it from being assigned to any new node. The cluster
|
|
// state is correct.
|
|
// Restart of NC fixes the issue.
|
|
if newNode.Spec.PodCIDR == "" {
|
|
return ra.AllocateOrOccupyCIDR(newNode)
|
|
}
|
|
return nil
|
|
}),
|
|
DeleteFunc: nodeutil.CreateDeleteNodeHandler(ra.ReleaseCIDR),
|
|
})
|
|
|
|
return ra, nil
|
|
}
|
|
|
|
func (r *rangeAllocator) Run(stopCh <-chan struct{}) {
|
|
defer utilruntime.HandleCrash()
|
|
|
|
glog.Infof("Starting range CIDR allocator")
|
|
defer glog.Infof("Shutting down range CIDR allocator")
|
|
|
|
if !controller.WaitForCacheSync("cidrallocator", stopCh, r.nodesSynced) {
|
|
return
|
|
}
|
|
|
|
for i := 0; i < cidrUpdateWorkers; i++ {
|
|
go r.worker(stopCh)
|
|
}
|
|
|
|
<-stopCh
|
|
}
|
|
|
|
func (r *rangeAllocator) worker(stopChan <-chan struct{}) {
|
|
for {
|
|
select {
|
|
case workItem, ok := <-r.nodeCIDRUpdateChannel:
|
|
if !ok {
|
|
glog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed")
|
|
return
|
|
}
|
|
if err := r.updateCIDRAllocation(workItem); err != nil {
|
|
// Requeue the failed node for update again.
|
|
r.nodeCIDRUpdateChannel <- workItem
|
|
}
|
|
case <-stopChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
|
|
r.lock.Lock()
|
|
defer r.lock.Unlock()
|
|
if r.nodesInProcessing.Has(nodeName) {
|
|
return false
|
|
}
|
|
r.nodesInProcessing.Insert(nodeName)
|
|
return true
|
|
}
|
|
|
|
func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) {
|
|
r.lock.Lock()
|
|
defer r.lock.Unlock()
|
|
r.nodesInProcessing.Delete(nodeName)
|
|
}
|
|
|
|
func (r *rangeAllocator) occupyCIDR(node *v1.Node) error {
|
|
defer r.removeNodeFromProcessing(node.Name)
|
|
if node.Spec.PodCIDR == "" {
|
|
return nil
|
|
}
|
|
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
|
|
}
|
|
if err := r.cidrs.Occupy(podCIDR); err != nil {
|
|
return fmt.Errorf("failed to mark cidr as occupied: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// WARNING: If you're adding any return calls or defer any more work from this
|
|
// function you have to make sure to update nodesInProcessing properly with the
|
|
// disposition of the node when the work is done.
|
|
func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
|
|
if node == nil {
|
|
return nil
|
|
}
|
|
if !r.insertNodeToProcessing(node.Name) {
|
|
glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name)
|
|
return nil
|
|
}
|
|
if node.Spec.PodCIDR != "" {
|
|
return r.occupyCIDR(node)
|
|
}
|
|
podCIDR, err := r.cidrs.AllocateNext()
|
|
if err != nil {
|
|
r.removeNodeFromProcessing(node.Name)
|
|
nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
|
|
return fmt.Errorf("failed to allocate cidr: %v", err)
|
|
}
|
|
|
|
glog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR)
|
|
r.nodeCIDRUpdateChannel <- nodeAndCIDR{
|
|
nodeName: node.Name,
|
|
cidr: podCIDR,
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *rangeAllocator) ReleaseCIDR(node *v1.Node) error {
|
|
if node == nil || node.Spec.PodCIDR == "" {
|
|
return nil
|
|
}
|
|
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to parse CIDR %s on Node %v: %v", node.Spec.PodCIDR, node.Name, err)
|
|
}
|
|
|
|
glog.V(4).Infof("release CIDR %s", node.Spec.PodCIDR)
|
|
if err = r.cidrs.Release(podCIDR); err != nil {
|
|
return fmt.Errorf("Error when releasing CIDR %v: %v", node.Spec.PodCIDR, err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used,
|
|
// so that they won't be assignable.
|
|
func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
|
|
// Checks if service CIDR has a nonempty intersection with cluster
|
|
// CIDR. It is the case if either clusterCIDR contains serviceCIDR with
|
|
// clusterCIDR's Mask applied (this means that clusterCIDR contains
|
|
// serviceCIDR) or vice versa (which means that serviceCIDR contains
|
|
// clusterCIDR).
|
|
if !r.clusterCIDR.Contains(serviceCIDR.IP.Mask(r.clusterCIDR.Mask)) && !serviceCIDR.Contains(r.clusterCIDR.IP.Mask(serviceCIDR.Mask)) {
|
|
return
|
|
}
|
|
|
|
if err := r.cidrs.Occupy(serviceCIDR); err != nil {
|
|
glog.Errorf("Error filtering out service cidr %v: %v", serviceCIDR, err)
|
|
}
|
|
}
|
|
|
|
// updateCIDRAllocation assigns CIDR to Node and sends an update to the API server.
|
|
func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
|
|
var err error
|
|
var node *v1.Node
|
|
defer r.removeNodeFromProcessing(data.nodeName)
|
|
|
|
podCIDR := data.cidr.String()
|
|
|
|
node, err = r.nodeLister.Get(data.nodeName)
|
|
if err != nil {
|
|
glog.Errorf("Failed while getting node %v for updating Node.Spec.PodCIDR: %v", data.nodeName, err)
|
|
return err
|
|
}
|
|
|
|
if node.Spec.PodCIDR == podCIDR {
|
|
glog.V(4).Infof("Node %v already has allocated CIDR %v. It matches the proposed one.", node.Name, podCIDR)
|
|
return nil
|
|
}
|
|
if node.Spec.PodCIDR != "" {
|
|
glog.Errorf("Node %v already has a CIDR allocated %v. Releasing the new one %v.", node.Name, node.Spec.PodCIDR, podCIDR)
|
|
if err := r.cidrs.Release(data.cidr); err != nil {
|
|
glog.Errorf("Error when releasing CIDR %v", podCIDR)
|
|
}
|
|
return nil
|
|
}
|
|
// If we reached here, it means that the node has no CIDR currently assigned. So we set it.
|
|
for i := 0; i < cidrUpdateRetries; i++ {
|
|
if err = utilnode.PatchNodeCIDR(r.client, types.NodeName(node.Name), podCIDR); err == nil {
|
|
glog.Infof("Set node %v PodCIDR to %v", node.Name, podCIDR)
|
|
return nil
|
|
}
|
|
}
|
|
glog.Errorf("Failed to update node %v PodCIDR to %v after multiple attempts: %v", node.Name, podCIDR, err)
|
|
nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
|
|
// We accept the fact that we may leak CIDRs here. This is safer than releasing
|
|
// them in case when we don't know if request went through.
|
|
// NodeController restart will return all falsely allocated CIDRs to the pool.
|
|
if !apierrors.IsServerTimeout(err) {
|
|
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", node.Name, err)
|
|
if releaseErr := r.cidrs.Release(data.cidr); releaseErr != nil {
|
|
glog.Errorf("Error releasing allocated CIDR for node %v: %v", node.Name, releaseErr)
|
|
}
|
|
}
|
|
return err
|
|
}
|