diff --git a/pkg/kubemark/controller.go b/pkg/kubemark/controller.go index 53d163175c..017524b7ce 100644 --- a/pkg/kubemark/controller.go +++ b/pkg/kubemark/controller.go @@ -39,7 +39,6 @@ import ( const ( namespaceKubemark = "kubemark" - hollowNodeName = "hollow-node" nodeGroupLabel = "autoscaling.k8s.io/nodegroup" numRetries = 3 ) @@ -48,10 +47,13 @@ const ( // to add and delete nodes from a kubemark cluster and introduces nodegroups // by applying labels to the kubemark's hollow-nodes. type KubemarkController struct { - nodeTemplate *apiv1.ReplicationController - externalCluster externalCluster - kubemarkCluster kubemarkCluster - rand *rand.Rand + nodeTemplate *apiv1.ReplicationController + externalCluster externalCluster + kubemarkCluster kubemarkCluster + rand *rand.Rand + createNodeQueue chan string + nodeGroupQueueSize map[string]int + nodeGroupQueueSizeLock sync.Mutex } // externalCluster is used to communicate with the external cluster that hosts @@ -98,7 +100,10 @@ func NewKubemarkController(externalClient kubeclient.Interface, externalInformer nodesToDelete: make(map[string]bool), nodesToDeleteLock: sync.Mutex{}, }, - rand: rand.New(rand.NewSource(time.Now().UTC().UnixNano())), + rand: rand.New(rand.NewSource(time.Now().UTC().UnixNano())), + createNodeQueue: make(chan string, 1000), + nodeGroupQueueSize: make(map[string]int), + nodeGroupQueueSizeLock: sync.Mutex{}, } kubemarkNodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -108,27 +113,29 @@ func NewKubemarkController(externalClient kubeclient.Interface, externalInformer return controller, nil } -// Init waits for population of caches and populates the node template needed -// for creation of kubemark nodes. -func (kubemarkController *KubemarkController) Init(stopCh chan struct{}) { - if !controller.WaitForCacheSync("kubemark", stopCh, +// WaitForCacheSync waits until all caches in the controller are populated. +func (kubemarkController *KubemarkController) WaitForCacheSync(stopCh chan struct{}) bool { + return controller.WaitForCacheSync("kubemark", stopCh, kubemarkController.externalCluster.rcSynced, kubemarkController.externalCluster.podSynced, - kubemarkController.kubemarkCluster.nodeSynced) { - return - } - - // Get hollow node template from an existing hollow node to be able to create - // new nodes based on it. - nodeTemplate, err := kubemarkController.getNodeTemplate() - if err != nil { - glog.Fatalf("Failed to get node template: %s", err) - } - kubemarkController.nodeTemplate = nodeTemplate + kubemarkController.kubemarkCluster.nodeSynced) } -// GetNodesForNodegroup returns list of the nodes in the node group. -func (kubemarkController *KubemarkController) GetNodeNamesForNodegroup(nodeGroup string) ([]string, error) { +// Run populates the node template needed for creation of kubemark nodes and +// starts the worker routine for creating new nodes. +func (kubemarkController *KubemarkController) Run(stopCh chan struct{}) { + nodeTemplate, err := kubemarkController.getNodeTemplate() + if err != nil { + glog.Fatalf("failed to get node template: %s", err) + } + kubemarkController.nodeTemplate = nodeTemplate + + go kubemarkController.runNodeCreation(stopCh) + <-stopCh +} + +// GetNodeNamesForNodeGroup returns list of the nodes in the node group. +func (kubemarkController *KubemarkController) GetNodeNamesForNodeGroup(nodeGroup string) ([]string, error) { selector := labels.SelectorFromSet(labels.Set{nodeGroupLabel: nodeGroup}) pods, err := kubemarkController.externalCluster.podLister.List(selector) if err != nil { @@ -141,7 +148,7 @@ func (kubemarkController *KubemarkController) GetNodeNamesForNodegroup(nodeGroup return result, nil } -// GetNodeGroupSize returns the current size for the node group. +// GetNodeGroupSize returns the current size for the node group as observed. func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string) (int, error) { selector := labels.SelectorFromSet(labels.Set(map[string]string{nodeGroupLabel: nodeGroup})) nodes, err := kubemarkController.externalCluster.rcLister.List(selector) @@ -151,21 +158,33 @@ func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string) return len(nodes), nil } +// GetNodeGroupTargetSize returns the size of the node group as a sum of current +// observed size and number of upcoming nodes. +func (kubemarkController *KubemarkController) GetNodeGroupTargetSize(nodeGroup string) (int, error) { + kubemarkController.nodeGroupQueueSizeLock.Lock() + defer kubemarkController.nodeGroupQueueSizeLock.Unlock() + realSize, err := kubemarkController.GetNodeGroupSize(nodeGroup) + if err != nil { + return realSize, err + } + return realSize + kubemarkController.nodeGroupQueueSize[nodeGroup], nil +} + // SetNodeGroupSize changes the size of node group by adding or removing nodes. func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string, size int) error { - currSize, err := kubemarkController.GetNodeGroupSize(nodeGroup) + currSize, err := kubemarkController.GetNodeGroupTargetSize(nodeGroup) if err != nil { return err } switch delta := size - currSize; { case delta < 0: absDelta := -delta - nodes, err := kubemarkController.GetNodeNamesForNodegroup(nodeGroup) + nodes, err := kubemarkController.GetNodeNamesForNodeGroup(nodeGroup) if err != nil { return err } - if len(nodes) > absDelta { - return fmt.Errorf("can't remove %d nodes from %s nodegroup, not enough nodes", absDelta, nodeGroup) + if len(nodes) < absDelta { + return fmt.Errorf("can't remove %d nodes from %s nodegroup, not enough nodes: %d", absDelta, nodeGroup, len(nodes)) } for i, node := range nodes { if i == absDelta { @@ -176,16 +195,31 @@ func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string, } } case delta > 0: + kubemarkController.nodeGroupQueueSizeLock.Lock() for i := 0; i < delta; i++ { - if err := kubemarkController.addNodeToNodeGroup(nodeGroup); err != nil { - return err - } + kubemarkController.nodeGroupQueueSize[nodeGroup]++ + kubemarkController.createNodeQueue <- nodeGroup } + kubemarkController.nodeGroupQueueSizeLock.Unlock() } return nil } +// GetNodeGroupForNode returns the name of the node group to which the node +// belongs. +func (kubemarkController *KubemarkController) GetNodeGroupForNode(node string) (string, error) { + pod := kubemarkController.getPodByName(node) + if pod == nil { + return "", fmt.Errorf("node %s does not exist", node) + } + nodeGroup, ok := pod.ObjectMeta.Labels[nodeGroupLabel] + if ok { + return nodeGroup, nil + } + return "", fmt.Errorf("can't find nodegroup for node %s due to missing label %s", node, nodeGroupLabel) +} + func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup string) error { templateCopy, err := api.Scheme.Copy(kubemarkController.nodeTemplate) if err != nil { @@ -207,35 +241,32 @@ func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup strin } func (kubemarkController *KubemarkController) removeNodeFromNodeGroup(nodeGroup string, node string) error { - pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything()) - if err != nil { - return err + pod := kubemarkController.getPodByName(node) + if pod == nil { + glog.Warningf("Can't delete node %s from nodegroup %s. Node does not exist.", node, nodeGroup) + return nil } - for _, pod := range pods { - if pod.ObjectMeta.Name == node { - if pod.ObjectMeta.Labels[nodeGroupLabel] != nodeGroup { - return fmt.Errorf("can't delete node %s from nodegroup %s. Node is not in nodegroup", node, nodeGroup) - } - policy := metav1.DeletePropagationForeground - for i := 0; i < numRetries; i++ { - err := kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(namespaceKubemark).Delete( - pod.ObjectMeta.Labels["name"], - &metav1.DeleteOptions{PropagationPolicy: &policy}) - if err == nil { - glog.Infof("marking node %s for deletion", node) - // Mark node for deletion from kubemark cluster. - // Once it becomes unready after replication controller - // deletion has been noticed, we will delete it explicitly. - // This is to cover for the fact that kubemark does not - // take care of this itself. - kubemarkController.kubemarkCluster.markNodeForDeletion(node) - return nil - } - } + if pod.ObjectMeta.Labels[nodeGroupLabel] != nodeGroup { + return fmt.Errorf("can't delete node %s from nodegroup %s. Node is not in nodegroup", node, nodeGroup) + } + policy := metav1.DeletePropagationForeground + var err error + for i := 0; i < numRetries; i++ { + err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(namespaceKubemark).Delete( + pod.ObjectMeta.Labels["name"], + &metav1.DeleteOptions{PropagationPolicy: &policy}) + if err == nil { + glog.Infof("marking node %s for deletion", node) + // Mark node for deletion from kubemark cluster. + // Once it becomes unready after replication controller + // deletion has been noticed, we will delete it explicitly. + // This is to cover for the fact that kubemark does not + // take care of this itself. + kubemarkController.kubemarkCluster.markNodeForDeletion(node) + return nil } } - - return fmt.Errorf("can't delete node %s from nodegroup %s. Node does not exist", node, nodeGroup) + return fmt.Errorf("Failed to delete node %s: %v", node, err) } func (kubemarkController *KubemarkController) getReplicationControllerByName(name string) *apiv1.ReplicationController { @@ -251,6 +282,19 @@ func (kubemarkController *KubemarkController) getReplicationControllerByName(nam return nil } +func (kubemarkController *KubemarkController) getPodByName(name string) *apiv1.Pod { + pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything()) + if err != nil { + return nil + } + for _, pod := range pods { + if pod.ObjectMeta.Name == name { + return pod + } + } + return nil +} + func (kubemarkController *KubemarkController) getNodeNameForPod(podName string) (string, error) { pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything()) if err != nil { @@ -293,6 +337,24 @@ func (kubemarkController *KubemarkController) getNodeTemplate() (*apiv1.Replicat return nil, fmt.Errorf("can't get hollow node template") } +func (kubemarkController *KubemarkController) runNodeCreation(stop <-chan struct{}) { + for { + select { + case nodeGroup := <-kubemarkController.createNodeQueue: + kubemarkController.nodeGroupQueueSizeLock.Lock() + err := kubemarkController.addNodeToNodeGroup(nodeGroup) + if err != nil { + glog.Errorf("failed to add node to node group %s: %v", nodeGroup, err) + } else { + kubemarkController.nodeGroupQueueSize[nodeGroup]-- + } + kubemarkController.nodeGroupQueueSizeLock.Unlock() + case <-stop: + return + } + } +} + func (kubemarkCluster *kubemarkCluster) getHollowNodeName() (string, error) { nodes, err := kubemarkCluster.nodeLister.List(labels.Everything()) if err != nil { @@ -318,7 +380,7 @@ func (kubemarkCluster *kubemarkCluster) removeUnneededNodes(oldObj interface{}, if kubemarkCluster.nodesToDelete[node.Name] { kubemarkCluster.nodesToDelete[node.Name] = false if err := kubemarkCluster.client.CoreV1().Nodes().Delete(node.Name, &metav1.DeleteOptions{}); err != nil { - glog.Errorf("failed to delete node %s from kubemark cluster", node.Name) + glog.Errorf("failed to delete node %s from kubemark cluster, err: %v", node.Name, err) } } return diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index f4fa2bfa8f..e938e32544 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -210,7 +210,8 @@ func (f *Framework) BeforeEach() { TestContext.CloudConfig.KubemarkController, err = kubemark.NewKubemarkController(externalClient, externalInformerFactory, f.ClientSet, kubemarkNodeInformer) Expect(err).NotTo(HaveOccurred()) externalInformerFactory.Start(f.kubemarkControllerCloseChannel) - TestContext.CloudConfig.KubemarkController.Init(f.kubemarkControllerCloseChannel) + Expect(TestContext.CloudConfig.KubemarkController.WaitForCacheSync(f.kubemarkControllerCloseChannel)).To(BeTrue()) + go TestContext.CloudConfig.KubemarkController.Run(f.kubemarkControllerCloseChannel) } } diff --git a/test/e2e/framework/size.go b/test/e2e/framework/size.go index 7883f41fc0..8878e89842 100644 --- a/test/e2e/framework/size.go +++ b/test/e2e/framework/size.go @@ -75,7 +75,7 @@ func GetGroupNodes(group string) ([]string, error) { } return lines, nil } else if TestContext.Provider == "kubemark" { - return TestContext.CloudConfig.KubemarkController.GetNodeNamesForNodegroup(group) + return TestContext.CloudConfig.KubemarkController.GetNodeNamesForNodeGroup(group) } else { return nil, fmt.Errorf("provider does not support InstanceGroups") }