2017-08-08 23:25:20 +00:00
|
|
|
/*
|
|
|
|
Copyright 2017 The Kubernetes Authors.
|
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package ipam
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"net"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
2018-11-09 18:49:10 +00:00
|
|
|
"k8s.io/klog"
|
2017-08-08 23:25:20 +00:00
|
|
|
|
|
|
|
"k8s.io/api/core/v1"
|
|
|
|
informers "k8s.io/client-go/informers/core/v1"
|
|
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
|
|
"k8s.io/client-go/tools/cache"
|
2018-09-05 22:58:22 +00:00
|
|
|
cloudprovider "k8s.io/cloud-provider"
|
2017-08-08 23:25:20 +00:00
|
|
|
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
|
2017-10-11 23:36:39 +00:00
|
|
|
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
|
|
|
|
nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync"
|
|
|
|
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
|
2017-08-08 23:25:20 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// Config for the IPAM controller.
|
|
|
|
type Config struct {
|
|
|
|
// Resync is the default timeout duration when there are no errors.
|
|
|
|
Resync time.Duration
|
|
|
|
// MaxBackoff is the maximum timeout when in a error backoff state.
|
|
|
|
MaxBackoff time.Duration
|
|
|
|
// InitialRetry is the initial retry interval when an error is reported.
|
|
|
|
InitialRetry time.Duration
|
|
|
|
// Mode to use to synchronize.
|
|
|
|
Mode nodesync.NodeSyncMode
|
|
|
|
}
|
|
|
|
|
|
|
|
// Controller is the controller for synchronizing cluster and cloud node
|
|
|
|
// pod CIDR range assignments.
|
|
|
|
type Controller struct {
|
|
|
|
config *Config
|
|
|
|
adapter *adapter
|
|
|
|
|
|
|
|
lock sync.Mutex
|
|
|
|
syncers map[string]*nodesync.NodeSync
|
|
|
|
|
|
|
|
set *cidrset.CidrSet
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewController returns a new instance of the IPAM controller.
|
|
|
|
func NewController(
|
|
|
|
config *Config,
|
|
|
|
kubeClient clientset.Interface,
|
|
|
|
cloud cloudprovider.Interface,
|
|
|
|
clusterCIDR, serviceCIDR *net.IPNet,
|
|
|
|
nodeCIDRMaskSize int) (*Controller, error) {
|
|
|
|
|
|
|
|
if !nodesync.IsValidMode(config.Mode) {
|
|
|
|
return nil, fmt.Errorf("invalid IPAM controller mode %q", config.Mode)
|
|
|
|
}
|
|
|
|
|
2018-10-20 07:14:19 +00:00
|
|
|
gceCloud, ok := cloud.(*gce.Cloud)
|
2017-08-08 23:25:20 +00:00
|
|
|
if !ok {
|
|
|
|
return nil, fmt.Errorf("cloud IPAM controller does not support %q provider", cloud.ProviderName())
|
|
|
|
}
|
|
|
|
|
2017-05-14 12:37:11 +00:00
|
|
|
set, err := cidrset.NewCIDRSet(clusterCIDR, nodeCIDRMaskSize)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2017-08-08 23:25:20 +00:00
|
|
|
c := &Controller{
|
|
|
|
config: config,
|
|
|
|
adapter: newAdapter(kubeClient, gceCloud),
|
|
|
|
syncers: make(map[string]*nodesync.NodeSync),
|
2017-05-14 12:37:11 +00:00
|
|
|
set: set,
|
2017-08-08 23:25:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if err := occupyServiceCIDR(c.set, clusterCIDR, serviceCIDR); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return c, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Start initializes the Controller with the existing list of nodes and
|
2018-02-09 06:53:53 +00:00
|
|
|
// registers the informers for node changes. This will start synchronization
|
2017-08-08 23:25:20 +00:00
|
|
|
// of the node and cloud CIDR range allocations.
|
|
|
|
func (c *Controller) Start(nodeInformer informers.NodeInformer) error {
|
2018-11-09 18:49:10 +00:00
|
|
|
klog.V(0).Infof("Starting IPAM controller (config=%+v)", c.config)
|
2017-08-08 23:25:20 +00:00
|
|
|
|
|
|
|
nodes, err := listNodes(c.adapter.k8s)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
for _, node := range nodes.Items {
|
|
|
|
if node.Spec.PodCIDR != "" {
|
|
|
|
_, cidrRange, err := net.ParseCIDR(node.Spec.PodCIDR)
|
|
|
|
if err == nil {
|
|
|
|
c.set.Occupy(cidrRange)
|
2018-11-09 18:49:10 +00:00
|
|
|
klog.V(3).Infof("Occupying CIDR for node %q (%v)", node.Name, node.Spec.PodCIDR)
|
2017-08-08 23:25:20 +00:00
|
|
|
} else {
|
2018-11-09 18:49:10 +00:00
|
|
|
klog.Errorf("Node %q has an invalid CIDR (%q): %v", node.Name, node.Spec.PodCIDR, err)
|
2017-08-08 23:25:20 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func() {
|
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
|
|
|
|
|
|
|
// XXX/bowei -- stagger the start of each sync cycle.
|
|
|
|
syncer := c.newSyncer(node.Name)
|
|
|
|
c.syncers[node.Name] = syncer
|
|
|
|
go syncer.Loop(nil)
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
2017-10-11 23:36:39 +00:00
|
|
|
AddFunc: nodeutil.CreateAddNodeHandler(c.onAdd),
|
|
|
|
UpdateFunc: nodeutil.CreateUpdateNodeHandler(c.onUpdate),
|
|
|
|
DeleteFunc: nodeutil.CreateDeleteNodeHandler(c.onDelete),
|
2017-08-08 23:25:20 +00:00
|
|
|
})
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// occupyServiceCIDR removes the service CIDR range from the cluster CIDR if it
|
|
|
|
// intersects.
|
|
|
|
func occupyServiceCIDR(set *cidrset.CidrSet, clusterCIDR, serviceCIDR *net.IPNet) error {
|
|
|
|
if clusterCIDR.Contains(serviceCIDR.IP) || serviceCIDR.Contains(clusterCIDR.IP) {
|
|
|
|
if err := set.Occupy(serviceCIDR); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type nodeState struct {
|
|
|
|
t Timeout
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ns *nodeState) ReportResult(err error) {
|
|
|
|
ns.t.Update(err == nil)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ns *nodeState) ResyncTimeout() time.Duration {
|
|
|
|
return ns.t.Next()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Controller) newSyncer(name string) *nodesync.NodeSync {
|
|
|
|
ns := &nodeState{
|
|
|
|
Timeout{
|
|
|
|
Resync: c.config.Resync,
|
|
|
|
MaxBackoff: c.config.MaxBackoff,
|
|
|
|
InitialRetry: c.config.InitialRetry,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
return nodesync.New(ns, c.adapter, c.adapter, c.config.Mode, name, c.set)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Controller) onAdd(node *v1.Node) error {
|
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
|
|
|
|
2018-03-13 18:20:50 +00:00
|
|
|
syncer, ok := c.syncers[node.Name]
|
|
|
|
if !ok {
|
2017-08-08 23:25:20 +00:00
|
|
|
syncer = c.newSyncer(node.Name)
|
|
|
|
c.syncers[node.Name] = syncer
|
|
|
|
go syncer.Loop(nil)
|
|
|
|
} else {
|
2018-11-09 18:49:10 +00:00
|
|
|
klog.Warningf("Add for node %q that already exists", node.Name)
|
2017-08-08 23:25:20 +00:00
|
|
|
}
|
2018-03-13 18:20:50 +00:00
|
|
|
syncer.Update(node)
|
2017-08-08 23:25:20 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Controller) onUpdate(_, node *v1.Node) error {
|
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
|
|
|
|
|
|
|
if sync, ok := c.syncers[node.Name]; ok {
|
|
|
|
sync.Update(node)
|
|
|
|
} else {
|
2018-11-09 18:49:10 +00:00
|
|
|
klog.Errorf("Received update for non-existent node %q", node.Name)
|
2017-08-08 23:25:20 +00:00
|
|
|
return fmt.Errorf("unknown node %q", node.Name)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Controller) onDelete(node *v1.Node) error {
|
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
|
|
|
|
|
|
|
if syncer, ok := c.syncers[node.Name]; ok {
|
|
|
|
syncer.Delete(node)
|
|
|
|
delete(c.syncers, node.Name)
|
|
|
|
} else {
|
2018-11-09 18:49:10 +00:00
|
|
|
klog.Warningf("Node %q was already deleted", node.Name)
|
2017-08-08 23:25:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|