Merge pull request #52917 from juanvallejo/jvallejo/support-multiple-node-selection

Automatic merge from submit-queue (batch tested with PRs 53454, 53446, 52935, 53443, 52917). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

add --selector flag support

**Release note**:
```release-note
NONE
```

Adds --selector option to kubectl drain,cordon,uncordon, performing
each action on a list of nodes matching the given label.

If at least one node fails to be cordoned/uncordoned, the command will
continue to operate on any remaining nodes, uninterrupted, reporting
any errors it encounters along the way.

If at least one node fails to be drained, the command will halt, printing
the immediate error encountered, and a list of nodes yet to be drained
(including the node that just failed to be drained).

cc @kubernetes/sig-cli-misc @fabianofranz @soltysh
pull/6/head
Kubernetes Submit Queue 2017-10-05 05:06:33 -07:00 committed by GitHub
commit 65cca36ebe
2 changed files with 152 additions and 52 deletions

View File

@ -4213,6 +4213,38 @@ run_certificates_tests() {
set +o errexit
}
run_cluster_management_tests() {
set -o nounset
set -o errexit
kube::log::status "Testing cluster-management commands"
kube::test::get_object_assert nodes "{{range.items}}{{$id_field}}:{{end}}" '127.0.0.1:'
### kubectl drain command fails when both --selector and a node argument are given
# Pre-condition: node exists and contains label test=label
kubectl label node "127.0.0.1" "test=label"
kube::test::get_object_assert "nodes 127.0.0.1" '{{.metadata.labels.test}}' 'label'
response=$(! kubectl drain "127.0.0.1" --selector test=label 2>&1)
kube::test::if_has_string "${response}" 'cannot specify both a node name'
### kubectl cordon command fails when no arguments are passed
# Pre-condition: node exists
response=$(! kubectl cordon 2>&1)
kube::test::if_has_string "${response}" 'error\: USAGE\: cordon NODE'
### kubectl cordon selects all nodes with an empty --selector=
# Pre-condition: node "127.0.0.1" is uncordoned
kubectl uncordon "127.0.0.1"
response=$(kubectl cordon --selector=)
kube::test::if_has_string "${response}" 'node "127.0.0.1" cordoned'
# Post-condition: node "127.0.0.1" is cordoned
kube::test::get_object_assert "nodes 127.0.0.1" "{{.spec.unschedulable}}" 'true'
set +o nounset
set +o errexit
}
run_plugins_tests() {
set -o nounset
set -o errexit
@ -4803,6 +4835,13 @@ runTests() {
record_command run_certificates_tests
fi
######################
# Cluster Management #
######################
if kube::test::if_supports_resource "${nodes}" ; then
record_command run_cluster_management_tests
fi
###########
# Plugins #
###########

View File

@ -24,8 +24,6 @@ import (
"strings"
"time"
"k8s.io/apimachinery/pkg/util/json"
"github.com/jonboulle/clockwork"
"github.com/spf13/cobra"
@ -36,6 +34,8 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
restclient "k8s.io/client-go/rest"
@ -60,8 +60,9 @@ type DrainOptions struct {
Timeout time.Duration
backOff clockwork.Clock
DeleteLocalData bool
Selector string
mapper meta.RESTMapper
nodeInfo *resource.Info
nodeInfos []*resource.Info
Out io.Writer
ErrOut io.Writer
typer runtime.ObjectTyper
@ -111,6 +112,7 @@ func NewCmdCordon(f cmdutil.Factory, out io.Writer) *cobra.Command {
cmdutil.CheckErr(options.RunCordonOrUncordon(true))
},
}
cmd.Flags().StringVarP(&options.Selector, "selector", "l", options.Selector, "Selector (label query) to filter on")
return cmd
}
@ -136,6 +138,7 @@ func NewCmdUncordon(f cmdutil.Factory, out io.Writer) *cobra.Command {
cmdutil.CheckErr(options.RunCordonOrUncordon(false))
},
}
cmd.Flags().StringVarP(&options.Selector, "selector", "l", options.Selector, "Selector (label query) to filter on")
return cmd
}
@ -191,6 +194,7 @@ func NewCmdDrain(f cmdutil.Factory, out, errOut io.Writer) *cobra.Command {
cmd.Flags().BoolVar(&options.DeleteLocalData, "delete-local-data", false, "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).")
cmd.Flags().IntVar(&options.GracePeriodSeconds, "grace-period", -1, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.")
cmd.Flags().DurationVar(&options.Timeout, "timeout", 0, "The length of time to wait before giving up, zero means infinite")
cmd.Flags().StringVarP(&options.Selector, "selector", "l", options.Selector, "Selector (label query) to filter on")
return cmd
}
@ -198,8 +202,16 @@ func NewCmdDrain(f cmdutil.Factory, out, errOut io.Writer) *cobra.Command {
// arguments and looks up the node using Builder
func (o *DrainOptions) SetupDrain(cmd *cobra.Command, args []string) error {
var err error
if len(args) != 1 {
return cmdutil.UsageErrorf(cmd, "USAGE: %s [flags]", cmd.Use)
o.Selector = cmdutil.GetFlagString(cmd, "selector")
if len(args) == 0 && !cmd.Flags().Changed("selector") {
return cmdutil.UsageErrorf(cmd, fmt.Sprintf("USAGE: %s [flags]", cmd.Use))
}
if len(args) > 0 && len(o.Selector) > 0 {
return cmdutil.UsageErrorf(cmd, "error: cannot specify both a node name and a --selector option")
}
if len(args) > 0 && len(args) != 1 {
return cmdutil.UsageErrorf(cmd, fmt.Sprintf("USAGE: %s [flags]", cmd.Use))
}
if o.client, err = o.Factory.ClientSet(); err != nil {
@ -211,6 +223,7 @@ func (o *DrainOptions) SetupDrain(cmd *cobra.Command, args []string) error {
return err
}
o.nodeInfos = []*resource.Info{}
o.mapper, o.typer = o.Factory.Object()
cmdNamespace, _, err := o.Factory.DefaultNamespace()
@ -218,9 +231,19 @@ func (o *DrainOptions) SetupDrain(cmd *cobra.Command, args []string) error {
return err
}
nameArgs := []string{"nodes"}
if len(args) > 0 {
nameArgs = append(nameArgs, args[0])
if strings.Contains(args[0], "/") {
nameArgs = []string{args[0]}
}
}
r := o.Factory.NewBuilder().
NamespaceParam(cmdNamespace).DefaultNamespace().
ResourceNames("node", args[0]).
SelectorParam(o.Selector).
ResourceTypeOrNameArgs(true, nameArgs...).
Flatten().
Do()
if err = r.Err(); err != nil {
@ -231,7 +254,7 @@ func (o *DrainOptions) SetupDrain(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
o.nodeInfo = info
o.nodeInfos = append(o.nodeInfos, info)
return nil
})
}
@ -242,26 +265,51 @@ func (o *DrainOptions) RunDrain() error {
return err
}
err := o.deleteOrEvictPodsSimple()
if err == nil {
cmdutil.PrintSuccess(o.mapper, false, o.Out, "node", o.nodeInfo.Name, false, "drained")
drainedNodes := sets.NewString()
var fatal error
for _, info := range o.nodeInfos {
err := o.deleteOrEvictPodsSimple(info)
if err == nil {
drainedNodes.Insert(info.Name)
cmdutil.PrintSuccess(o.mapper, false, o.Out, "node", info.Name, false, "drained")
} else {
fmt.Fprintf(o.ErrOut, "error: unable to drain node %q, aborting command...\n\n", info.Name)
remainingNodes := []string{}
fatal = err
for _, remainingInfo := range o.nodeInfos {
if drainedNodes.Has(remainingInfo.Name) {
continue
}
remainingNodes = append(remainingNodes, remainingInfo.Name)
}
if len(remainingNodes) > 0 {
fmt.Fprintf(o.ErrOut, "There are pending nodes to be drained:\n")
for _, nodeName := range remainingNodes {
fmt.Fprintf(o.ErrOut, " %s\n", nodeName)
}
}
break
}
}
return err
return fatal
}
func (o *DrainOptions) deleteOrEvictPodsSimple() error {
pods, err := o.getPodsForDeletion()
func (o *DrainOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error {
pods, err := o.getPodsForDeletion(nodeInfo)
if err != nil {
return err
}
err = o.deleteOrEvictPods(pods)
if err != nil {
pendingPods, newErr := o.getPodsForDeletion()
pendingPods, newErr := o.getPodsForDeletion(nodeInfo)
if newErr != nil {
return newErr
}
fmt.Fprintf(o.ErrOut, "There are pending pods when an error occurred: %v\n", err)
fmt.Fprintf(o.ErrOut, "There are pending pods in node %q when an error occurred: %v\n", nodeInfo.Name, err)
for _, pendingPod := range pendingPods {
fmt.Fprintf(o.ErrOut, "%s/%s\n", "pod", pendingPod.Name)
}
@ -393,11 +441,11 @@ func (ps podStatuses) Message() string {
return strings.Join(msgs, "; ")
}
// getPodsForDeletion returns all the pods we're going to delete. If there are
// any pods preventing us from deleting, we return that list in an error.
func (o *DrainOptions) getPodsForDeletion() (pods []api.Pod, err error) {
// getPodsForDeletion receives resource info for a node, and returns all the pods from the given node that we
// are planning on deleting. If there are any pods preventing us from deleting, we return that list in an error.
func (o *DrainOptions) getPodsForDeletion(nodeInfo *resource.Info) (pods []api.Pod, err error) {
podList, err := o.client.Core().Pods(metav1.NamespaceAll).List(metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": o.nodeInfo.Name}).String()})
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeInfo.Name}).String()})
if err != nil {
return pods, err
}
@ -625,41 +673,54 @@ func (o *DrainOptions) RunCordonOrUncordon(desired bool) error {
return err
}
if o.nodeInfo.Mapping.GroupVersionKind.Kind == "Node" {
obj, err := o.nodeInfo.Mapping.ConvertToVersion(o.nodeInfo.Object, o.nodeInfo.Mapping.GroupVersionKind.GroupVersion())
if err != nil {
return err
}
oldData, err := json.Marshal(obj)
if err != nil {
return err
}
node, ok := obj.(*corev1.Node)
if !ok {
return fmt.Errorf("unexpected Type%T, expected Node", obj)
}
unsched := node.Spec.Unschedulable
if unsched == desired {
cmdutil.PrintSuccess(o.mapper, false, o.Out, o.nodeInfo.Mapping.Resource, o.nodeInfo.Name, false, already(desired))
cordonOrUncordon := "cordon"
if !desired {
cordonOrUncordon = "un" + cordonOrUncordon
}
for _, nodeInfo := range o.nodeInfos {
if nodeInfo.Mapping.GroupVersionKind.Kind == "Node" {
obj, err := nodeInfo.Mapping.ConvertToVersion(nodeInfo.Object, nodeInfo.Mapping.GroupVersionKind.GroupVersion())
if err != nil {
fmt.Printf("error: unable to %s node %q: %v", cordonOrUncordon, nodeInfo.Name, err)
continue
}
oldData, err := json.Marshal(obj)
if err != nil {
fmt.Printf("error: unable to %s node %q: %v", cordonOrUncordon, nodeInfo.Name, err)
continue
}
node, ok := obj.(*corev1.Node)
if !ok {
fmt.Fprintf(o.ErrOut, "error: unable to %s node %q: unexpected Type%T, expected Node", cordonOrUncordon, nodeInfo.Name, obj)
continue
}
unsched := node.Spec.Unschedulable
if unsched == desired {
cmdutil.PrintSuccess(o.mapper, false, o.Out, nodeInfo.Mapping.Resource, nodeInfo.Name, false, already(desired))
} else {
helper := resource.NewHelper(o.restClient, nodeInfo.Mapping)
node.Spec.Unschedulable = desired
newData, err := json.Marshal(obj)
if err != nil {
fmt.Fprintf(o.ErrOut, "error: unable to %s node %q: %v", cordonOrUncordon, nodeInfo.Name, err)
continue
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, obj)
if err != nil {
fmt.Printf("error: unable to %s node %q: %v", cordonOrUncordon, nodeInfo.Name, err)
continue
}
_, err = helper.Patch(cmdNamespace, nodeInfo.Name, types.StrategicMergePatchType, patchBytes)
if err != nil {
fmt.Printf("error: unable to %s node %q: %v", cordonOrUncordon, nodeInfo.Name, err)
continue
}
cmdutil.PrintSuccess(o.mapper, false, o.Out, nodeInfo.Mapping.Resource, nodeInfo.Name, false, changed(desired))
}
} else {
helper := resource.NewHelper(o.restClient, o.nodeInfo.Mapping)
node.Spec.Unschedulable = desired
newData, err := json.Marshal(obj)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, obj)
if err != nil {
return err
}
_, err = helper.Patch(cmdNamespace, o.nodeInfo.Name, types.StrategicMergePatchType, patchBytes)
if err != nil {
return err
}
cmdutil.PrintSuccess(o.mapper, false, o.Out, o.nodeInfo.Mapping.Resource, o.nodeInfo.Name, false, changed(desired))
cmdutil.PrintSuccess(o.mapper, false, o.Out, nodeInfo.Mapping.Resource, nodeInfo.Name, false, "skipped")
}
} else {
cmdutil.PrintSuccess(o.mapper, false, o.Out, o.nodeInfo.Mapping.Resource, o.nodeInfo.Name, false, "skipped")
}
return nil