Fix kubectl drain for statefulset and use eviciton for drain if possible

pull/6/head
ymqytw 2016-10-20 09:43:48 -07:00
parent 0c7421fb51
commit b73fae6c55
18 changed files with 548 additions and 103 deletions

View File

@ -413,6 +413,17 @@ func IsInternalError(err error) bool {
return reasonForError(err) == unversioned.StatusReasonInternalError
}
// IsTooManyRequests determines if err is an error which indicates that there are too many requests
// that the server cannot handle.
// TODO: update IsTooManyRequests() when the TooManyRequests(429) error returned from the API server has a non-empty Reason field
func IsTooManyRequests(err error) bool {
switch t := err.(type) {
case APIStatus:
return t.Status().Code == StatusTooManyRequests
}
return false
}
// IsUnexpectedServerError returns true if the server response was not in the expected API format,
// and may be the result of another HTTP actor.
func IsUnexpectedServerError(err error) bool {

View File

@ -90,6 +90,9 @@ type PodDisruptionBudgetList struct {
Items []PodDisruptionBudget `json:"items"`
}
// +genclient=true
// +noMethods=true
// Eviction evicts a pod from its node subject to certain policies and safety constraints.
// This is a subresource of Pod. A request to cause such an eviction is
// created by POSTing to .../pods/<pod name>/eviction.

View File

@ -14,6 +14,8 @@ go_library(
name = "go_default_library",
srcs = [
"doc.go",
"eviction.go",
"eviction_expansion.go",
"generated_expansion.go",
"poddisruptionbudget.go",
"policy_client.go",

View File

@ -0,0 +1,46 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internalversion
import (
restclient "k8s.io/kubernetes/pkg/client/restclient"
)
// EvictionsGetter has a method to return a EvictionInterface.
// A group's client should implement this interface.
type EvictionsGetter interface {
Evictions(namespace string) EvictionInterface
}
// EvictionInterface has methods to work with Eviction resources.
type EvictionInterface interface {
EvictionExpansion
}
// evictions implements EvictionInterface
type evictions struct {
client restclient.Interface
ns string
}
// newEvictions returns a Evictions
func newEvictions(c *PolicyClient, namespace string) *evictions {
return &evictions{
client: c.RESTClient(),
ns: namespace,
}
}

View File

@ -0,0 +1,38 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internalversion
import (
policy "k8s.io/kubernetes/pkg/apis/policy"
)
// The EvictionExpansion interface allows manually adding extra methods to the ScaleInterface.
type EvictionExpansion interface {
Evict(eviction *policy.Eviction) error
}
func (c *evictions) Evict(eviction *policy.Eviction) error {
return c.client.Post().
AbsPath("/api/v1").
Namespace(eviction.Namespace).
Resource("pods").
Name(eviction.Name).
SubResource("eviction").
Body(eviction).
Do().
Error()
}

View File

@ -14,6 +14,8 @@ go_library(
name = "go_default_library",
srcs = [
"doc.go",
"fake_eviction.go",
"fake_eviction_expansion.go",
"fake_poddisruptionbudget.go",
"fake_policy_client.go",
],

View File

@ -0,0 +1,23 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fake
// FakeEvictions implements EvictionInterface
type FakeEvictions struct {
Fake *FakePolicy
ns string
}

View File

@ -0,0 +1,33 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fake
import (
unversioned "k8s.io/kubernetes/pkg/api/unversioned"
policy "k8s.io/kubernetes/pkg/apis/policy"
core "k8s.io/kubernetes/pkg/client/testing/core"
)
func (c *FakeEvictions) Evict(eviction *policy.Eviction) error {
action := core.GetActionImpl{}
action.Verb = "post"
action.Namespace = c.ns
action.Resource = unversioned.GroupVersionResource{Group: "", Version: "", Resource: "pods"}
action.Subresource = "eviction"
_, err := c.Fake.Invokes(action, eviction)
return err
}

View File

@ -26,6 +26,10 @@ type FakePolicy struct {
*core.Fake
}
func (c *FakePolicy) Evictions(namespace string) internalversion.EvictionInterface {
return &FakeEvictions{c, namespace}
}
func (c *FakePolicy) PodDisruptionBudgets(namespace string) internalversion.PodDisruptionBudgetInterface {
return &FakePodDisruptionBudgets{c, namespace}
}

View File

@ -24,6 +24,7 @@ import (
type PolicyInterface interface {
RESTClient() restclient.Interface
EvictionsGetter
PodDisruptionBudgetsGetter
}
@ -32,6 +33,10 @@ type PolicyClient struct {
restClient restclient.Interface
}
func (c *PolicyClient) Evictions(namespace string) EvictionInterface {
return newEvictions(c, namespace)
}
func (c *PolicyClient) PodDisruptionBudgets(namespace string) PodDisruptionBudgetInterface {
return newPodDisruptionBudgets(c, namespace)
}

View File

@ -13,6 +13,7 @@ load(
go_library(
name = "go_default_library",
srcs = [
"eviction.go",
"expansion_generated.go",
"poddisruptionbudget.go",
],

View File

@ -0,0 +1,94 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This file was automatically generated by lister-gen with arguments: --input-dirs=[k8s.io/kubernetes/pkg/api,k8s.io/kubernetes/pkg/api/v1,k8s.io/kubernetes/pkg/apis/abac,k8s.io/kubernetes/pkg/apis/abac/v0,k8s.io/kubernetes/pkg/apis/abac/v1beta1,k8s.io/kubernetes/pkg/apis/apps,k8s.io/kubernetes/pkg/apis/apps/v1beta1,k8s.io/kubernetes/pkg/apis/authentication,k8s.io/kubernetes/pkg/apis/authentication/v1beta1,k8s.io/kubernetes/pkg/apis/authorization,k8s.io/kubernetes/pkg/apis/authorization/v1beta1,k8s.io/kubernetes/pkg/apis/autoscaling,k8s.io/kubernetes/pkg/apis/autoscaling/v1,k8s.io/kubernetes/pkg/apis/batch,k8s.io/kubernetes/pkg/apis/batch/v1,k8s.io/kubernetes/pkg/apis/batch/v2alpha1,k8s.io/kubernetes/pkg/apis/certificates,k8s.io/kubernetes/pkg/apis/certificates/v1alpha1,k8s.io/kubernetes/pkg/apis/componentconfig,k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1,k8s.io/kubernetes/pkg/apis/extensions,k8s.io/kubernetes/pkg/apis/extensions/v1beta1,k8s.io/kubernetes/pkg/apis/imagepolicy,k8s.io/kubernetes/pkg/apis/imagepolicy/v1alpha1,k8s.io/kubernetes/pkg/apis/policy,k8s.io/kubernetes/pkg/apis/policy/v1alpha1,k8s.io/kubernetes/pkg/apis/policy/v1beta1,k8s.io/kubernetes/pkg/apis/rbac,k8s.io/kubernetes/pkg/apis/rbac/v1alpha1,k8s.io/kubernetes/pkg/apis/storage,k8s.io/kubernetes/pkg/apis/storage/v1beta1]
package internalversion
import (
"k8s.io/kubernetes/pkg/api/errors"
policy "k8s.io/kubernetes/pkg/apis/policy"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/labels"
)
// EvictionLister helps list Evictions.
type EvictionLister interface {
// List lists all Evictions in the indexer.
List(selector labels.Selector) (ret []*policy.Eviction, err error)
// Evictions returns an object that can list and get Evictions.
Evictions(namespace string) EvictionNamespaceLister
EvictionListerExpansion
}
// evictionLister implements the EvictionLister interface.
type evictionLister struct {
indexer cache.Indexer
}
// NewEvictionLister returns a new EvictionLister.
func NewEvictionLister(indexer cache.Indexer) EvictionLister {
return &evictionLister{indexer: indexer}
}
// List lists all Evictions in the indexer.
func (s *evictionLister) List(selector labels.Selector) (ret []*policy.Eviction, err error) {
err = cache.ListAll(s.indexer, selector, func(m interface{}) {
ret = append(ret, m.(*policy.Eviction))
})
return ret, err
}
// Evictions returns an object that can list and get Evictions.
func (s *evictionLister) Evictions(namespace string) EvictionNamespaceLister {
return evictionNamespaceLister{indexer: s.indexer, namespace: namespace}
}
// EvictionNamespaceLister helps list and get Evictions.
type EvictionNamespaceLister interface {
// List lists all Evictions in the indexer for a given namespace.
List(selector labels.Selector) (ret []*policy.Eviction, err error)
// Get retrieves the Eviction from the indexer for a given namespace and name.
Get(name string) (*policy.Eviction, error)
EvictionNamespaceListerExpansion
}
// evictionNamespaceLister implements the EvictionNamespaceLister
// interface.
type evictionNamespaceLister struct {
indexer cache.Indexer
namespace string
}
// List lists all Evictions in the indexer for a given namespace.
func (s evictionNamespaceLister) List(selector labels.Selector) (ret []*policy.Eviction, err error) {
err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
ret = append(ret, m.(*policy.Eviction))
})
return ret, err
}
// Get retrieves the Eviction from the indexer for a given namespace and name.
func (s evictionNamespaceLister) Get(name string) (*policy.Eviction, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(policy.Resource("eviction"), name)
}
return obj.(*policy.Eviction), nil
}

View File

@ -18,6 +18,14 @@ limitations under the License.
package internalversion
// EvictionListerExpansion allows custom methods to be added to
// EvictionLister.
type EvictionListerExpansion interface{}
// EvictionNamespaceListerExpansion allows custom methods to be added to
// EvictionNamespaeLister.
type EvictionNamespaceListerExpansion interface{}
// PodDisruptionBudgetListerExpansion allows custom methods to be added to
// PodDisruptionBudgetLister.
type PodDisruptionBudgetListerExpansion interface{}

View File

@ -70,6 +70,7 @@ go_library(
"//pkg/apimachinery/registered:go_default_library",
"//pkg/apis/batch/v1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/policy:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/client/restclient:go_default_library",
@ -173,6 +174,7 @@ go_test(
"//pkg/apimachinery/registered:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/apis/policy:go_default_library",
"//pkg/client/restclient:go_default_library",
"//pkg/client/restclient/fake:go_default_library",
"//pkg/client/typed/dynamic:go_default_library",

View File

@ -255,7 +255,7 @@ func NewKubectlCommand(f cmdutil.Factory, in io.Reader, out, err io.Writer) *cob
NewCmdTop(f, out, err),
NewCmdCordon(f, out),
NewCmdUncordon(f, out),
NewCmdDrain(f, out),
NewCmdDrain(f, out, err),
NewCmdTaint(f, out),
},
},

View File

@ -18,6 +18,7 @@ package cmd
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
@ -28,6 +29,7 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/client/restclient"
@ -110,6 +112,14 @@ func objBody(codec runtime.Codec, obj runtime.Object) io.ReadCloser {
return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj))))
}
func policyObjBody(obj runtime.Object) io.ReadCloser {
return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(testapi.Policy.Codec(), obj))))
}
func bytesBody(bodyBytes []byte) io.ReadCloser {
return ioutil.NopCloser(bytes.NewReader(bodyBytes))
}
func stringBody(body string) io.ReadCloser {
return ioutil.NopCloser(bytes.NewReader([]byte(body)))
}
@ -616,3 +626,11 @@ func TestNormalizationFuncGlobalExistence(t *testing.T) {
t.Fatal("child and root commands should have the same normalization functions")
}
}
func genResponseWithJsonEncodedBody(bodyStruct interface{}) (*http.Response, error) {
jsonBytes, err := json.Marshal(bodyStruct)
if err != nil {
return nil, err
}
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bytesBody(jsonBytes)}, nil
}

View File

@ -20,15 +20,19 @@ import (
"errors"
"fmt"
"io"
"math"
"reflect"
"strings"
"time"
"github.com/jonboulle/clockwork"
"github.com/spf13/cobra"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/policy"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/fields"
@ -49,10 +53,12 @@ type DrainOptions struct {
GracePeriodSeconds int
IgnoreDaemonsets bool
Timeout time.Duration
backOff clockwork.Clock
DeleteLocalData bool
mapper meta.RESTMapper
nodeInfo *resource.Info
out io.Writer
errOut io.Writer
typer runtime.ObjectTyper
}
@ -67,6 +73,9 @@ type fatal struct {
}
const (
EvictionKind = "Eviction"
EvictionSubresource = "pods/eviction"
kDaemonsetFatal = "DaemonSet-managed pods (use --ignore-daemonsets to ignore)"
kDaemonsetWarning = "Ignoring DaemonSet-managed pods"
kLocalStorageFatal = "pods with local storage (use --delete-local-data to override)"
@ -152,8 +161,8 @@ var (
$ kubectl drain foo --grace-period=900`)
)
func NewCmdDrain(f cmdutil.Factory, out io.Writer) *cobra.Command {
options := &DrainOptions{factory: f, out: out}
func NewCmdDrain(f cmdutil.Factory, out, errOut io.Writer) *cobra.Command {
options := &DrainOptions{factory: f, out: out, errOut: errOut, backOff: clockwork.NewRealClock()}
cmd := &cobra.Command{
Use: "drain NODE",
@ -221,16 +230,43 @@ func (o *DrainOptions) RunDrain() error {
return err
}
err := o.deleteOrEvictPodsSimple()
// TODO: update IsTooManyRequests() when the TooManyRequests(429) error returned from the API server has a non-empty Reason field
for i := 1; i <= maxPatchRetry && apierrors.IsTooManyRequests(err); i++ {
if i > triesBeforeBackOff {
currBackOffPeriod := time.Duration(math.Exp2(float64(i-triesBeforeBackOff))) * backOffPeriod
fmt.Fprintf(o.errOut, "Retry in %v\n", currBackOffPeriod)
o.backOff.Sleep(currBackOffPeriod)
}
fmt.Fprintf(o.errOut, "Retrying\n")
err = o.deleteOrEvictPodsSimple()
}
if err == nil {
cmdutil.PrintSuccess(o.mapper, false, o.out, "node", o.nodeInfo.Name, false, "drained")
}
return err
}
func (o *DrainOptions) deleteOrEvictPodsSimple() error {
pods, err := o.getPodsForDeletion()
if err != nil {
return err
}
if err = o.deletePods(pods); err != nil {
return err
if o.Timeout == 0 {
o.Timeout = kubectl.Timeout + time.Duration(10*len(pods))*time.Second
}
cmdutil.PrintSuccess(o.mapper, false, o.out, "node", o.nodeInfo.Name, false, "drained")
return nil
err = o.deleteOrEvictPods(pods)
if err != nil {
pendingPods, newErr := o.getPodsForDeletion()
if newErr != nil {
return newErr
}
fmt.Fprintf(o.errOut, "There are pending pods when an error occurred: %v\n", err)
for _, pendingPod := range pendingPods {
fmt.Fprintf(o.errOut, "%s/%s\n", "pod", pendingPod.Name)
}
}
return err
}
func (o *DrainOptions) getController(sr *api.SerializedReference) (interface{}, error) {
@ -243,6 +279,8 @@ func (o *DrainOptions) getController(sr *api.SerializedReference) (interface{},
return o.client.Batch().Jobs(sr.Reference.Namespace).Get(sr.Reference.Name)
case "ReplicaSet":
return o.client.Extensions().ReplicaSets(sr.Reference.Namespace).Get(sr.Reference.Name)
case "StatefulSet":
return o.client.Apps().StatefulSets(sr.Reference.Namespace).Get(sr.Reference.Name)
}
return nil, fmt.Errorf("Unknown controller kind %q", sr.Reference.Kind)
}
@ -252,7 +290,6 @@ func (o *DrainOptions) getPodCreator(pod api.Pod) (*api.SerializedReference, err
if !found {
return nil, nil
}
// Now verify that the specified creator actually exists.
sr := &api.SerializedReference{}
if err := runtime.DecodeInto(o.factory.Decoder(true), []byte(creatorRef), sr); err != nil {
@ -380,21 +417,58 @@ func (o *DrainOptions) getPodsForDeletion() (pods []api.Pod, err error) {
return []api.Pod{}, errors.New(fs.Message())
}
if len(ws) > 0 {
fmt.Fprintf(o.out, "WARNING: %s\n", ws.Message())
fmt.Fprintf(o.errOut, "WARNING: %s\n", ws.Message())
}
return pods, nil
}
// deletePods deletes the pods on the api server
func (o *DrainOptions) deletePods(pods []api.Pod) error {
deleteOptions := api.DeleteOptions{}
func (o *DrainOptions) deletePod(pod api.Pod) error {
deleteOptions := &api.DeleteOptions{}
if o.GracePeriodSeconds >= 0 {
gracePeriodSeconds := int64(o.GracePeriodSeconds)
deleteOptions.GracePeriodSeconds = &gracePeriodSeconds
}
return o.client.Core().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
}
func (o *DrainOptions) evictPod(pod api.Pod, policyGroupVersion string) error {
deleteOptions := &api.DeleteOptions{}
if o.GracePeriodSeconds >= 0 {
gracePeriodSeconds := int64(o.GracePeriodSeconds)
deleteOptions.GracePeriodSeconds = &gracePeriodSeconds
}
eviction := &policy.Eviction{
TypeMeta: unversioned.TypeMeta{
APIVersion: policyGroupVersion,
Kind: EvictionKind,
},
ObjectMeta: api.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
DeleteOptions: deleteOptions,
}
// Remember to change change the URL manipulation func when Evction's version change
return o.client.Policy().Evictions(eviction.Namespace).Evict(eviction)
}
// deleteOrEvictPods deletes or evicts the pods on the api server
func (o *DrainOptions) deleteOrEvictPods(pods []api.Pod) error {
if len(pods) == 0 {
return nil
}
policyGroupVersion, err := SupportEviction(o.client)
if err != nil {
return err
}
for _, pod := range pods {
err := o.client.Core().Pods(pod.Namespace).Delete(pod.Name, &deleteOptions)
if len(policyGroupVersion) > 0 {
err = o.evictPod(pod, policyGroupVersion)
} else {
err = o.deletePod(pod)
}
if err != nil {
return err
}
@ -403,17 +477,11 @@ func (o *DrainOptions) deletePods(pods []api.Pod) error {
getPodFn := func(namespace, name string) (*api.Pod, error) {
return o.client.Core().Pods(namespace).Get(name)
}
pendingPods, err := o.waitForDelete(pods, kubectl.Interval, o.Timeout, getPodFn)
if err != nil {
fmt.Fprintf(o.out, "There are pending pods when an error occured:\n")
for _, pendindPod := range pendingPods {
cmdutil.PrintSuccess(o.mapper, true, o.out, "pod", pendindPod.Name, false, "")
}
}
_, err = o.waitForDelete(pods, kubectl.Interval, o.Timeout, getPodFn)
return err
}
func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, getPodFn func(namespace, name string) (*api.Pod, error)) ([]api.Pod, error) {
func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, getPodFn func(string, string) (*api.Pod, error)) ([]api.Pod, error) {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
pendingPods := []api.Pod{}
for i, pod := range pods {
@ -436,6 +504,38 @@ func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Dura
return pods, err
}
// SupportEviction uses Discovery API to find out if the server support eviction subresource
// If support, it will return its groupVersion; Otherwise, it will return ""
func SupportEviction(clientset *internalclientset.Clientset) (string, error) {
discoveryClient := clientset.Discovery()
groupList, err := discoveryClient.ServerGroups()
if err != nil {
return "", err
}
foundPolicyGroup := false
var policyGroupVersion string
for _, group := range groupList.Groups {
if group.Name == "policy" {
foundPolicyGroup = true
policyGroupVersion = group.PreferredVersion.GroupVersion
break
}
}
if !foundPolicyGroup {
return "", nil
}
resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1")
if err != nil {
return "", err
}
for _, resource := range resourceList.APIResources {
if resource.Name == EvictionSubresource && resource.Kind == EvictionKind {
return policyGroupVersion, nil
}
}
return "", nil
}
// RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for
// "Unschedulable" is passed as the first arg.
func (o *DrainOptions) RunCordonOrUncordon(desired bool) error {

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apis/policy"
"k8s.io/kubernetes/pkg/client/restclient/fake"
"k8s.io/kubernetes/pkg/conversion"
cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing"
@ -48,6 +49,11 @@ import (
"k8s.io/kubernetes/pkg/util/wait"
)
const (
EvictionMethod = "Eviction"
DeleteMethod = "Delete"
)
var node *api.Node
var cordoned_node *api.Node
@ -463,95 +469,144 @@ func TestDrain(t *testing.T) {
},
}
for _, test := range tests {
new_node := &api.Node{}
deleted := false
f, tf, codec, ns := cmdtesting.NewAPIFactory()
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
m := &MyReq{req}
switch {
case m.isFor("GET", "/nodes/node"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, test.node)}, nil
case m.isFor("GET", "/namespaces/default/replicationcontrollers/rc"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &test.rcs[0])}, nil
case m.isFor("GET", "/namespaces/default/daemonsets/ds"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &ds)}, nil
case m.isFor("GET", "/namespaces/default/jobs/job"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &job)}, nil
case m.isFor("GET", "/namespaces/default/replicasets/rs"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &test.replicaSets[0])}, nil
case m.isFor("GET", "/namespaces/default/pods/bar"):
return &http.Response{StatusCode: 404, Header: defaultHeader(), Body: objBody(codec, nil)}, nil
case m.isFor("GET", "/pods"):
values, err := url.ParseQuery(req.URL.RawQuery)
if err != nil {
t.Fatalf("%s: unexpected error: %v", test.description, err)
}
get_params := make(url.Values)
get_params["fieldSelector"] = []string{"spec.nodeName=node"}
if !reflect.DeepEqual(get_params, values) {
t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, get_params, values)
}
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.PodList{Items: test.pods})}, nil
case m.isFor("GET", "/replicationcontrollers"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.ReplicationControllerList{Items: test.rcs})}, nil
case m.isFor("PUT", "/nodes/node"):
data, err := ioutil.ReadAll(req.Body)
if err != nil {
t.Fatalf("%s: unexpected error: %v", test.description, err)
}
defer req.Body.Close()
if err := runtime.DecodeInto(codec, data, new_node); err != nil {
t.Fatalf("%s: unexpected error: %v", test.description, err)
}
if !reflect.DeepEqual(test.expected.Spec, new_node.Spec) {
t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, test.expected.Spec, new_node.Spec)
}
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, new_node)}, nil
case m.isFor("DELETE", "/namespaces/default/pods/bar"):
deleted = true
return &http.Response{StatusCode: 204, Header: defaultHeader(), Body: objBody(codec, &test.pods[0])}, nil
default:
t.Fatalf("%s: unexpected request: %v %#v\n%#v", test.description, req.Method, req.URL, req)
return nil, nil
}
}),
testEviction := false
for i := 0; i < 2; i++ {
testEviction = !testEviction
var currMethod string
if testEviction {
currMethod = EvictionMethod
} else {
currMethod = DeleteMethod
}
tf.ClientConfig = defaultClientConfig()
for _, test := range tests {
new_node := &api.Node{}
deleted := false
evicted := false
f, tf, codec, ns := cmdtesting.NewAPIFactory()
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
m := &MyReq{req}
switch {
case req.Method == "GET" && req.URL.Path == "/api":
apiVersions := unversioned.APIVersions{
Versions: []string{"v1"},
}
return genResponseWithJsonEncodedBody(apiVersions)
case req.Method == "GET" && req.URL.Path == "/apis":
groupList := unversioned.APIGroupList{
Groups: []unversioned.APIGroup{
{
Name: "policy",
PreferredVersion: unversioned.GroupVersionForDiscovery{
GroupVersion: "policy/v1beta1",
},
},
},
}
return genResponseWithJsonEncodedBody(groupList)
case req.Method == "GET" && req.URL.Path == "/api/v1":
resourceList := unversioned.APIResourceList{
GroupVersion: "v1",
}
if testEviction {
resourceList.APIResources = []unversioned.APIResource{
{
Name: EvictionSubresource,
Kind: EvictionKind,
},
}
}
return genResponseWithJsonEncodedBody(resourceList)
case m.isFor("GET", "/nodes/node"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, test.node)}, nil
case m.isFor("GET", "/namespaces/default/replicationcontrollers/rc"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &test.rcs[0])}, nil
case m.isFor("GET", "/namespaces/default/daemonsets/ds"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &ds)}, nil
case m.isFor("GET", "/namespaces/default/jobs/job"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &job)}, nil
case m.isFor("GET", "/namespaces/default/replicasets/rs"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &test.replicaSets[0])}, nil
case m.isFor("GET", "/namespaces/default/pods/bar"):
return &http.Response{StatusCode: 404, Header: defaultHeader(), Body: objBody(codec, &api.Pod{})}, nil
case m.isFor("GET", "/pods"):
values, err := url.ParseQuery(req.URL.RawQuery)
if err != nil {
t.Fatalf("%s: unexpected error: %v", test.description, err)
}
get_params := make(url.Values)
get_params["fieldSelector"] = []string{"spec.nodeName=node"}
if !reflect.DeepEqual(get_params, values) {
t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, get_params, values)
}
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.PodList{Items: test.pods})}, nil
case m.isFor("GET", "/replicationcontrollers"):
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.ReplicationControllerList{Items: test.rcs})}, nil
case m.isFor("PUT", "/nodes/node"):
data, err := ioutil.ReadAll(req.Body)
if err != nil {
t.Fatalf("%s: unexpected error: %v", test.description, err)
}
defer req.Body.Close()
if err := runtime.DecodeInto(codec, data, new_node); err != nil {
t.Fatalf("%s: unexpected error: %v", test.description, err)
}
if !reflect.DeepEqual(test.expected.Spec, new_node.Spec) {
t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, test.expected.Spec, new_node.Spec)
}
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, new_node)}, nil
case m.isFor("DELETE", "/namespaces/default/pods/bar"):
deleted = true
return &http.Response{StatusCode: 204, Header: defaultHeader(), Body: objBody(codec, &test.pods[0])}, nil
case m.isFor("POST", "/namespaces/default/pods/bar/eviction"):
evicted = true
return &http.Response{StatusCode: 201, Header: defaultHeader(), Body: policyObjBody(&policy.Eviction{})}, nil
default:
t.Fatalf("%s: unexpected request: %v %#v\n%#v", test.description, req.Method, req.URL, req)
return nil, nil
}
}),
}
tf.ClientConfig = defaultClientConfig()
buf := bytes.NewBuffer([]byte{})
cmd := NewCmdDrain(f, buf)
buf := bytes.NewBuffer([]byte{})
errBuf := bytes.NewBuffer([]byte{})
cmd := NewCmdDrain(f, buf, errBuf)
saw_fatal := false
func() {
defer func() {
// Recover from the panic below.
_ = recover()
// Restore cmdutil behavior
cmdutil.DefaultBehaviorOnFatal()
saw_fatal := false
func() {
defer func() {
// Recover from the panic below.
_ = recover()
// Restore cmdutil behavior
cmdutil.DefaultBehaviorOnFatal()
}()
cmdutil.BehaviorOnFatal(func(e string, code int) { saw_fatal = true; panic(e) })
cmd.SetArgs(test.args)
cmd.Execute()
}()
cmdutil.BehaviorOnFatal(func(e string, code int) { saw_fatal = true; panic(e) })
cmd.SetArgs(test.args)
cmd.Execute()
}()
if test.expectFatal {
if !saw_fatal {
t.Fatalf("%s: unexpected non-error", test.description)
if test.expectFatal {
if !saw_fatal {
t.Fatalf("%s: unexpected non-error when using %s", test.description, currMethod)
}
}
}
if test.expectDelete {
if !deleted {
t.Fatalf("%s: pod never deleted", test.description)
if test.expectDelete {
// Test Delete
if !testEviction && !deleted {
t.Fatalf("%s: pod never deleted", test.description)
}
// Test Eviction
if testEviction && !evicted {
t.Fatalf("%s: pod never evicted", test.description)
}
}
}
if !test.expectDelete {
if deleted {
t.Fatalf("%s: unexpected delete", test.description)
if !test.expectDelete {
if deleted {
t.Fatalf("%s: unexpected delete when using %s", test.description, currMethod)
}
}
}
}