Add functionality needed by Cluster Autoscaler to Kubemark Provider.

Make adding nodes asynchronous. Add method for getting target
size of node group. Add method for getting node group for node.
Factor out some common code.
pull/6/head
Beata Skiba 2017-08-08 20:50:17 +02:00
parent 7ef5cc23d1
commit 20a3756024
3 changed files with 123 additions and 60 deletions

View File

@ -39,7 +39,6 @@ import (
const (
namespaceKubemark = "kubemark"
hollowNodeName = "hollow-node"
nodeGroupLabel = "autoscaling.k8s.io/nodegroup"
numRetries = 3
)
@ -52,6 +51,9 @@ type KubemarkController struct {
externalCluster externalCluster
kubemarkCluster kubemarkCluster
rand *rand.Rand
createNodeQueue chan string
nodeGroupQueueSize map[string]int
nodeGroupQueueSizeLock sync.Mutex
}
// externalCluster is used to communicate with the external cluster that hosts
@ -99,6 +101,9 @@ func NewKubemarkController(externalClient kubeclient.Interface, externalInformer
nodesToDeleteLock: sync.Mutex{},
},
rand: rand.New(rand.NewSource(time.Now().UTC().UnixNano())),
createNodeQueue: make(chan string, 1000),
nodeGroupQueueSize: make(map[string]int),
nodeGroupQueueSizeLock: sync.Mutex{},
}
kubemarkNodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -108,27 +113,29 @@ func NewKubemarkController(externalClient kubeclient.Interface, externalInformer
return controller, nil
}
// Init waits for population of caches and populates the node template needed
// for creation of kubemark nodes.
func (kubemarkController *KubemarkController) Init(stopCh chan struct{}) {
if !controller.WaitForCacheSync("kubemark", stopCh,
// WaitForCacheSync waits until all caches in the controller are populated.
func (kubemarkController *KubemarkController) WaitForCacheSync(stopCh chan struct{}) bool {
return controller.WaitForCacheSync("kubemark", stopCh,
kubemarkController.externalCluster.rcSynced,
kubemarkController.externalCluster.podSynced,
kubemarkController.kubemarkCluster.nodeSynced) {
return
kubemarkController.kubemarkCluster.nodeSynced)
}
// Get hollow node template from an existing hollow node to be able to create
// new nodes based on it.
// Run populates the node template needed for creation of kubemark nodes and
// starts the worker routine for creating new nodes.
func (kubemarkController *KubemarkController) Run(stopCh chan struct{}) {
nodeTemplate, err := kubemarkController.getNodeTemplate()
if err != nil {
glog.Fatalf("Failed to get node template: %s", err)
glog.Fatalf("failed to get node template: %s", err)
}
kubemarkController.nodeTemplate = nodeTemplate
go kubemarkController.runNodeCreation(stopCh)
<-stopCh
}
// GetNodesForNodegroup returns list of the nodes in the node group.
func (kubemarkController *KubemarkController) GetNodeNamesForNodegroup(nodeGroup string) ([]string, error) {
// GetNodeNamesForNodeGroup returns list of the nodes in the node group.
func (kubemarkController *KubemarkController) GetNodeNamesForNodeGroup(nodeGroup string) ([]string, error) {
selector := labels.SelectorFromSet(labels.Set{nodeGroupLabel: nodeGroup})
pods, err := kubemarkController.externalCluster.podLister.List(selector)
if err != nil {
@ -141,7 +148,7 @@ func (kubemarkController *KubemarkController) GetNodeNamesForNodegroup(nodeGroup
return result, nil
}
// GetNodeGroupSize returns the current size for the node group.
// GetNodeGroupSize returns the current size for the node group as observed.
func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string) (int, error) {
selector := labels.SelectorFromSet(labels.Set(map[string]string{nodeGroupLabel: nodeGroup}))
nodes, err := kubemarkController.externalCluster.rcLister.List(selector)
@ -151,21 +158,33 @@ func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string)
return len(nodes), nil
}
// GetNodeGroupTargetSize returns the size of the node group as a sum of current
// observed size and number of upcoming nodes.
func (kubemarkController *KubemarkController) GetNodeGroupTargetSize(nodeGroup string) (int, error) {
kubemarkController.nodeGroupQueueSizeLock.Lock()
defer kubemarkController.nodeGroupQueueSizeLock.Unlock()
realSize, err := kubemarkController.GetNodeGroupSize(nodeGroup)
if err != nil {
return realSize, err
}
return realSize + kubemarkController.nodeGroupQueueSize[nodeGroup], nil
}
// SetNodeGroupSize changes the size of node group by adding or removing nodes.
func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string, size int) error {
currSize, err := kubemarkController.GetNodeGroupSize(nodeGroup)
currSize, err := kubemarkController.GetNodeGroupTargetSize(nodeGroup)
if err != nil {
return err
}
switch delta := size - currSize; {
case delta < 0:
absDelta := -delta
nodes, err := kubemarkController.GetNodeNamesForNodegroup(nodeGroup)
nodes, err := kubemarkController.GetNodeNamesForNodeGroup(nodeGroup)
if err != nil {
return err
}
if len(nodes) > absDelta {
return fmt.Errorf("can't remove %d nodes from %s nodegroup, not enough nodes", absDelta, nodeGroup)
if len(nodes) < absDelta {
return fmt.Errorf("can't remove %d nodes from %s nodegroup, not enough nodes: %d", absDelta, nodeGroup, len(nodes))
}
for i, node := range nodes {
if i == absDelta {
@ -176,16 +195,31 @@ func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string,
}
}
case delta > 0:
kubemarkController.nodeGroupQueueSizeLock.Lock()
for i := 0; i < delta; i++ {
if err := kubemarkController.addNodeToNodeGroup(nodeGroup); err != nil {
return err
}
kubemarkController.nodeGroupQueueSize[nodeGroup]++
kubemarkController.createNodeQueue <- nodeGroup
}
kubemarkController.nodeGroupQueueSizeLock.Unlock()
}
return nil
}
// GetNodeGroupForNode returns the name of the node group to which the node
// belongs.
func (kubemarkController *KubemarkController) GetNodeGroupForNode(node string) (string, error) {
pod := kubemarkController.getPodByName(node)
if pod == nil {
return "", fmt.Errorf("node %s does not exist", node)
}
nodeGroup, ok := pod.ObjectMeta.Labels[nodeGroupLabel]
if ok {
return nodeGroup, nil
}
return "", fmt.Errorf("can't find nodegroup for node %s due to missing label %s", node, nodeGroupLabel)
}
func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup string) error {
templateCopy, err := api.Scheme.Copy(kubemarkController.nodeTemplate)
if err != nil {
@ -207,18 +241,18 @@ func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup strin
}
func (kubemarkController *KubemarkController) removeNodeFromNodeGroup(nodeGroup string, node string) error {
pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
if err != nil {
return err
pod := kubemarkController.getPodByName(node)
if pod == nil {
glog.Warningf("Can't delete node %s from nodegroup %s. Node does not exist.", node, nodeGroup)
return nil
}
for _, pod := range pods {
if pod.ObjectMeta.Name == node {
if pod.ObjectMeta.Labels[nodeGroupLabel] != nodeGroup {
return fmt.Errorf("can't delete node %s from nodegroup %s. Node is not in nodegroup", node, nodeGroup)
}
policy := metav1.DeletePropagationForeground
var err error
for i := 0; i < numRetries; i++ {
err := kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(namespaceKubemark).Delete(
err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(namespaceKubemark).Delete(
pod.ObjectMeta.Labels["name"],
&metav1.DeleteOptions{PropagationPolicy: &policy})
if err == nil {
@ -232,10 +266,7 @@ func (kubemarkController *KubemarkController) removeNodeFromNodeGroup(nodeGroup
return nil
}
}
}
}
return fmt.Errorf("can't delete node %s from nodegroup %s. Node does not exist", node, nodeGroup)
return fmt.Errorf("Failed to delete node %s: %v", node, err)
}
func (kubemarkController *KubemarkController) getReplicationControllerByName(name string) *apiv1.ReplicationController {
@ -251,6 +282,19 @@ func (kubemarkController *KubemarkController) getReplicationControllerByName(nam
return nil
}
func (kubemarkController *KubemarkController) getPodByName(name string) *apiv1.Pod {
pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
if err != nil {
return nil
}
for _, pod := range pods {
if pod.ObjectMeta.Name == name {
return pod
}
}
return nil
}
func (kubemarkController *KubemarkController) getNodeNameForPod(podName string) (string, error) {
pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
if err != nil {
@ -293,6 +337,24 @@ func (kubemarkController *KubemarkController) getNodeTemplate() (*apiv1.Replicat
return nil, fmt.Errorf("can't get hollow node template")
}
func (kubemarkController *KubemarkController) runNodeCreation(stop <-chan struct{}) {
for {
select {
case nodeGroup := <-kubemarkController.createNodeQueue:
kubemarkController.nodeGroupQueueSizeLock.Lock()
err := kubemarkController.addNodeToNodeGroup(nodeGroup)
if err != nil {
glog.Errorf("failed to add node to node group %s: %v", nodeGroup, err)
} else {
kubemarkController.nodeGroupQueueSize[nodeGroup]--
}
kubemarkController.nodeGroupQueueSizeLock.Unlock()
case <-stop:
return
}
}
}
func (kubemarkCluster *kubemarkCluster) getHollowNodeName() (string, error) {
nodes, err := kubemarkCluster.nodeLister.List(labels.Everything())
if err != nil {
@ -318,7 +380,7 @@ func (kubemarkCluster *kubemarkCluster) removeUnneededNodes(oldObj interface{},
if kubemarkCluster.nodesToDelete[node.Name] {
kubemarkCluster.nodesToDelete[node.Name] = false
if err := kubemarkCluster.client.CoreV1().Nodes().Delete(node.Name, &metav1.DeleteOptions{}); err != nil {
glog.Errorf("failed to delete node %s from kubemark cluster", node.Name)
glog.Errorf("failed to delete node %s from kubemark cluster, err: %v", node.Name, err)
}
}
return

View File

@ -210,7 +210,8 @@ func (f *Framework) BeforeEach() {
TestContext.CloudConfig.KubemarkController, err = kubemark.NewKubemarkController(externalClient, externalInformerFactory, f.ClientSet, kubemarkNodeInformer)
Expect(err).NotTo(HaveOccurred())
externalInformerFactory.Start(f.kubemarkControllerCloseChannel)
TestContext.CloudConfig.KubemarkController.Init(f.kubemarkControllerCloseChannel)
Expect(TestContext.CloudConfig.KubemarkController.WaitForCacheSync(f.kubemarkControllerCloseChannel)).To(BeTrue())
go TestContext.CloudConfig.KubemarkController.Run(f.kubemarkControllerCloseChannel)
}
}

View File

@ -75,7 +75,7 @@ func GetGroupNodes(group string) ([]string, error) {
}
return lines, nil
} else if TestContext.Provider == "kubemark" {
return TestContext.CloudConfig.KubemarkController.GetNodeNamesForNodegroup(group)
return TestContext.CloudConfig.KubemarkController.GetNodeNamesForNodeGroup(group)
} else {
return nil, fmt.Errorf("provider does not support InstanceGroups")
}