From 20a375602455d64670092a1101f84998221c9308 Mon Sep 17 00:00:00 2001 From: Beata Skiba Date: Tue, 8 Aug 2017 20:50:17 +0200 Subject: [PATCH] Add functionality needed by Cluster Autoscaler to Kubemark Provider. Make adding nodes asynchronous. Add method for getting target size of node group. Add method for getting node group for node. Factor out some common code. --- pkg/kubemark/controller.go | 178 +++++++++++++++++++++----------- test/e2e/framework/framework.go | 3 +- test/e2e/framework/size.go | 2 +- 3 files changed, 123 insertions(+), 60 deletions(-) diff --git a/pkg/kubemark/controller.go b/pkg/kubemark/controller.go index 53d163175c..017524b7ce 100644 --- a/pkg/kubemark/controller.go +++ b/pkg/kubemark/controller.go @@ -39,7 +39,6 @@ import ( const ( namespaceKubemark = "kubemark" - hollowNodeName = "hollow-node" nodeGroupLabel = "autoscaling.k8s.io/nodegroup" numRetries = 3 ) @@ -48,10 +47,13 @@ const ( // to add and delete nodes from a kubemark cluster and introduces nodegroups // by applying labels to the kubemark's hollow-nodes. type KubemarkController struct { - nodeTemplate *apiv1.ReplicationController - externalCluster externalCluster - kubemarkCluster kubemarkCluster - rand *rand.Rand + nodeTemplate *apiv1.ReplicationController + externalCluster externalCluster + kubemarkCluster kubemarkCluster + rand *rand.Rand + createNodeQueue chan string + nodeGroupQueueSize map[string]int + nodeGroupQueueSizeLock sync.Mutex } // externalCluster is used to communicate with the external cluster that hosts @@ -98,7 +100,10 @@ func NewKubemarkController(externalClient kubeclient.Interface, externalInformer nodesToDelete: make(map[string]bool), nodesToDeleteLock: sync.Mutex{}, }, - rand: rand.New(rand.NewSource(time.Now().UTC().UnixNano())), + rand: rand.New(rand.NewSource(time.Now().UTC().UnixNano())), + createNodeQueue: make(chan string, 1000), + nodeGroupQueueSize: make(map[string]int), + nodeGroupQueueSizeLock: sync.Mutex{}, } kubemarkNodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -108,27 +113,29 @@ func NewKubemarkController(externalClient kubeclient.Interface, externalInformer return controller, nil } -// Init waits for population of caches and populates the node template needed -// for creation of kubemark nodes. -func (kubemarkController *KubemarkController) Init(stopCh chan struct{}) { - if !controller.WaitForCacheSync("kubemark", stopCh, +// WaitForCacheSync waits until all caches in the controller are populated. +func (kubemarkController *KubemarkController) WaitForCacheSync(stopCh chan struct{}) bool { + return controller.WaitForCacheSync("kubemark", stopCh, kubemarkController.externalCluster.rcSynced, kubemarkController.externalCluster.podSynced, - kubemarkController.kubemarkCluster.nodeSynced) { - return - } - - // Get hollow node template from an existing hollow node to be able to create - // new nodes based on it. - nodeTemplate, err := kubemarkController.getNodeTemplate() - if err != nil { - glog.Fatalf("Failed to get node template: %s", err) - } - kubemarkController.nodeTemplate = nodeTemplate + kubemarkController.kubemarkCluster.nodeSynced) } -// GetNodesForNodegroup returns list of the nodes in the node group. -func (kubemarkController *KubemarkController) GetNodeNamesForNodegroup(nodeGroup string) ([]string, error) { +// Run populates the node template needed for creation of kubemark nodes and +// starts the worker routine for creating new nodes. +func (kubemarkController *KubemarkController) Run(stopCh chan struct{}) { + nodeTemplate, err := kubemarkController.getNodeTemplate() + if err != nil { + glog.Fatalf("failed to get node template: %s", err) + } + kubemarkController.nodeTemplate = nodeTemplate + + go kubemarkController.runNodeCreation(stopCh) + <-stopCh +} + +// GetNodeNamesForNodeGroup returns list of the nodes in the node group. +func (kubemarkController *KubemarkController) GetNodeNamesForNodeGroup(nodeGroup string) ([]string, error) { selector := labels.SelectorFromSet(labels.Set{nodeGroupLabel: nodeGroup}) pods, err := kubemarkController.externalCluster.podLister.List(selector) if err != nil { @@ -141,7 +148,7 @@ func (kubemarkController *KubemarkController) GetNodeNamesForNodegroup(nodeGroup return result, nil } -// GetNodeGroupSize returns the current size for the node group. +// GetNodeGroupSize returns the current size for the node group as observed. func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string) (int, error) { selector := labels.SelectorFromSet(labels.Set(map[string]string{nodeGroupLabel: nodeGroup})) nodes, err := kubemarkController.externalCluster.rcLister.List(selector) @@ -151,21 +158,33 @@ func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string) return len(nodes), nil } +// GetNodeGroupTargetSize returns the size of the node group as a sum of current +// observed size and number of upcoming nodes. +func (kubemarkController *KubemarkController) GetNodeGroupTargetSize(nodeGroup string) (int, error) { + kubemarkController.nodeGroupQueueSizeLock.Lock() + defer kubemarkController.nodeGroupQueueSizeLock.Unlock() + realSize, err := kubemarkController.GetNodeGroupSize(nodeGroup) + if err != nil { + return realSize, err + } + return realSize + kubemarkController.nodeGroupQueueSize[nodeGroup], nil +} + // SetNodeGroupSize changes the size of node group by adding or removing nodes. func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string, size int) error { - currSize, err := kubemarkController.GetNodeGroupSize(nodeGroup) + currSize, err := kubemarkController.GetNodeGroupTargetSize(nodeGroup) if err != nil { return err } switch delta := size - currSize; { case delta < 0: absDelta := -delta - nodes, err := kubemarkController.GetNodeNamesForNodegroup(nodeGroup) + nodes, err := kubemarkController.GetNodeNamesForNodeGroup(nodeGroup) if err != nil { return err } - if len(nodes) > absDelta { - return fmt.Errorf("can't remove %d nodes from %s nodegroup, not enough nodes", absDelta, nodeGroup) + if len(nodes) < absDelta { + return fmt.Errorf("can't remove %d nodes from %s nodegroup, not enough nodes: %d", absDelta, nodeGroup, len(nodes)) } for i, node := range nodes { if i == absDelta { @@ -176,16 +195,31 @@ func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string, } } case delta > 0: + kubemarkController.nodeGroupQueueSizeLock.Lock() for i := 0; i < delta; i++ { - if err := kubemarkController.addNodeToNodeGroup(nodeGroup); err != nil { - return err - } + kubemarkController.nodeGroupQueueSize[nodeGroup]++ + kubemarkController.createNodeQueue <- nodeGroup } + kubemarkController.nodeGroupQueueSizeLock.Unlock() } return nil } +// GetNodeGroupForNode returns the name of the node group to which the node +// belongs. +func (kubemarkController *KubemarkController) GetNodeGroupForNode(node string) (string, error) { + pod := kubemarkController.getPodByName(node) + if pod == nil { + return "", fmt.Errorf("node %s does not exist", node) + } + nodeGroup, ok := pod.ObjectMeta.Labels[nodeGroupLabel] + if ok { + return nodeGroup, nil + } + return "", fmt.Errorf("can't find nodegroup for node %s due to missing label %s", node, nodeGroupLabel) +} + func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup string) error { templateCopy, err := api.Scheme.Copy(kubemarkController.nodeTemplate) if err != nil { @@ -207,35 +241,32 @@ func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup strin } func (kubemarkController *KubemarkController) removeNodeFromNodeGroup(nodeGroup string, node string) error { - pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything()) - if err != nil { - return err + pod := kubemarkController.getPodByName(node) + if pod == nil { + glog.Warningf("Can't delete node %s from nodegroup %s. Node does not exist.", node, nodeGroup) + return nil } - for _, pod := range pods { - if pod.ObjectMeta.Name == node { - if pod.ObjectMeta.Labels[nodeGroupLabel] != nodeGroup { - return fmt.Errorf("can't delete node %s from nodegroup %s. Node is not in nodegroup", node, nodeGroup) - } - policy := metav1.DeletePropagationForeground - for i := 0; i < numRetries; i++ { - err := kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(namespaceKubemark).Delete( - pod.ObjectMeta.Labels["name"], - &metav1.DeleteOptions{PropagationPolicy: &policy}) - if err == nil { - glog.Infof("marking node %s for deletion", node) - // Mark node for deletion from kubemark cluster. - // Once it becomes unready after replication controller - // deletion has been noticed, we will delete it explicitly. - // This is to cover for the fact that kubemark does not - // take care of this itself. - kubemarkController.kubemarkCluster.markNodeForDeletion(node) - return nil - } - } + if pod.ObjectMeta.Labels[nodeGroupLabel] != nodeGroup { + return fmt.Errorf("can't delete node %s from nodegroup %s. Node is not in nodegroup", node, nodeGroup) + } + policy := metav1.DeletePropagationForeground + var err error + for i := 0; i < numRetries; i++ { + err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(namespaceKubemark).Delete( + pod.ObjectMeta.Labels["name"], + &metav1.DeleteOptions{PropagationPolicy: &policy}) + if err == nil { + glog.Infof("marking node %s for deletion", node) + // Mark node for deletion from kubemark cluster. + // Once it becomes unready after replication controller + // deletion has been noticed, we will delete it explicitly. + // This is to cover for the fact that kubemark does not + // take care of this itself. + kubemarkController.kubemarkCluster.markNodeForDeletion(node) + return nil } } - - return fmt.Errorf("can't delete node %s from nodegroup %s. Node does not exist", node, nodeGroup) + return fmt.Errorf("Failed to delete node %s: %v", node, err) } func (kubemarkController *KubemarkController) getReplicationControllerByName(name string) *apiv1.ReplicationController { @@ -251,6 +282,19 @@ func (kubemarkController *KubemarkController) getReplicationControllerByName(nam return nil } +func (kubemarkController *KubemarkController) getPodByName(name string) *apiv1.Pod { + pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything()) + if err != nil { + return nil + } + for _, pod := range pods { + if pod.ObjectMeta.Name == name { + return pod + } + } + return nil +} + func (kubemarkController *KubemarkController) getNodeNameForPod(podName string) (string, error) { pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything()) if err != nil { @@ -293,6 +337,24 @@ func (kubemarkController *KubemarkController) getNodeTemplate() (*apiv1.Replicat return nil, fmt.Errorf("can't get hollow node template") } +func (kubemarkController *KubemarkController) runNodeCreation(stop <-chan struct{}) { + for { + select { + case nodeGroup := <-kubemarkController.createNodeQueue: + kubemarkController.nodeGroupQueueSizeLock.Lock() + err := kubemarkController.addNodeToNodeGroup(nodeGroup) + if err != nil { + glog.Errorf("failed to add node to node group %s: %v", nodeGroup, err) + } else { + kubemarkController.nodeGroupQueueSize[nodeGroup]-- + } + kubemarkController.nodeGroupQueueSizeLock.Unlock() + case <-stop: + return + } + } +} + func (kubemarkCluster *kubemarkCluster) getHollowNodeName() (string, error) { nodes, err := kubemarkCluster.nodeLister.List(labels.Everything()) if err != nil { @@ -318,7 +380,7 @@ func (kubemarkCluster *kubemarkCluster) removeUnneededNodes(oldObj interface{}, if kubemarkCluster.nodesToDelete[node.Name] { kubemarkCluster.nodesToDelete[node.Name] = false if err := kubemarkCluster.client.CoreV1().Nodes().Delete(node.Name, &metav1.DeleteOptions{}); err != nil { - glog.Errorf("failed to delete node %s from kubemark cluster", node.Name) + glog.Errorf("failed to delete node %s from kubemark cluster, err: %v", node.Name, err) } } return diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index f4fa2bfa8f..e938e32544 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -210,7 +210,8 @@ func (f *Framework) BeforeEach() { TestContext.CloudConfig.KubemarkController, err = kubemark.NewKubemarkController(externalClient, externalInformerFactory, f.ClientSet, kubemarkNodeInformer) Expect(err).NotTo(HaveOccurred()) externalInformerFactory.Start(f.kubemarkControllerCloseChannel) - TestContext.CloudConfig.KubemarkController.Init(f.kubemarkControllerCloseChannel) + Expect(TestContext.CloudConfig.KubemarkController.WaitForCacheSync(f.kubemarkControllerCloseChannel)).To(BeTrue()) + go TestContext.CloudConfig.KubemarkController.Run(f.kubemarkControllerCloseChannel) } } diff --git a/test/e2e/framework/size.go b/test/e2e/framework/size.go index 7883f41fc0..8878e89842 100644 --- a/test/e2e/framework/size.go +++ b/test/e2e/framework/size.go @@ -75,7 +75,7 @@ func GetGroupNodes(group string) ([]string, error) { } return lines, nil } else if TestContext.Provider == "kubemark" { - return TestContext.CloudConfig.KubemarkController.GetNodeNamesForNodegroup(group) + return TestContext.CloudConfig.KubemarkController.GetNodeNamesForNodeGroup(group) } else { return nil, fmt.Errorf("provider does not support InstanceGroups") }