From b73fae6c5594c26fb28fca397ef82c663f1e4f83 Mon Sep 17 00:00:00 2001 From: ymqytw Date: Thu, 20 Oct 2016 09:43:48 -0700 Subject: [PATCH] Fix kubectl drain for statefulset and use eviciton for drain if possible --- pkg/api/errors/errors.go | 11 + pkg/apis/policy/types.go | 3 + .../typed/policy/internalversion/BUILD | 2 + .../typed/policy/internalversion/eviction.go | 46 ++++ .../internalversion/eviction_expansion.go | 38 +++ .../typed/policy/internalversion/fake/BUILD | 2 + .../internalversion/fake/fake_eviction.go | 23 ++ .../fake/fake_eviction_expansion.go | 33 +++ .../fake/fake_policy_client.go | 4 + .../policy/internalversion/policy_client.go | 5 + .../listers/policy/internalversion/BUILD | 1 + .../policy/internalversion/eviction.go | 94 ++++++++ .../internalversion/expansion_generated.go | 8 + pkg/kubectl/cmd/BUILD | 2 + pkg/kubectl/cmd/cmd.go | 2 +- pkg/kubectl/cmd/cmd_test.go | 18 ++ pkg/kubectl/cmd/drain.go | 142 ++++++++++-- pkg/kubectl/cmd/drain_test.go | 217 +++++++++++------- 18 files changed, 548 insertions(+), 103 deletions(-) create mode 100644 pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/eviction.go create mode 100644 pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/eviction_expansion.go create mode 100644 pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/fake/fake_eviction.go create mode 100644 pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/fake/fake_eviction_expansion.go create mode 100644 pkg/client/listers/policy/internalversion/eviction.go diff --git a/pkg/api/errors/errors.go b/pkg/api/errors/errors.go index 8623b3aab4..f54823ffcc 100644 --- a/pkg/api/errors/errors.go +++ b/pkg/api/errors/errors.go @@ -413,6 +413,17 @@ func IsInternalError(err error) bool { return reasonForError(err) == unversioned.StatusReasonInternalError } +// IsTooManyRequests determines if err is an error which indicates that there are too many requests +// that the server cannot handle. +// TODO: update IsTooManyRequests() when the TooManyRequests(429) error returned from the API server has a non-empty Reason field +func IsTooManyRequests(err error) bool { + switch t := err.(type) { + case APIStatus: + return t.Status().Code == StatusTooManyRequests + } + return false +} + // IsUnexpectedServerError returns true if the server response was not in the expected API format, // and may be the result of another HTTP actor. func IsUnexpectedServerError(err error) bool { diff --git a/pkg/apis/policy/types.go b/pkg/apis/policy/types.go index 6104b2fce8..1255c322ce 100644 --- a/pkg/apis/policy/types.go +++ b/pkg/apis/policy/types.go @@ -90,6 +90,9 @@ type PodDisruptionBudgetList struct { Items []PodDisruptionBudget `json:"items"` } +// +genclient=true +// +noMethods=true + // Eviction evicts a pod from its node subject to certain policies and safety constraints. // This is a subresource of Pod. A request to cause such an eviction is // created by POSTing to .../pods//eviction. diff --git a/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/BUILD b/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/BUILD index 5c47ac90f8..a3f783f2a3 100644 --- a/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/BUILD +++ b/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/BUILD @@ -14,6 +14,8 @@ go_library( name = "go_default_library", srcs = [ "doc.go", + "eviction.go", + "eviction_expansion.go", "generated_expansion.go", "poddisruptionbudget.go", "policy_client.go", diff --git a/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/eviction.go b/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/eviction.go new file mode 100644 index 0000000000..b16a393437 --- /dev/null +++ b/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/eviction.go @@ -0,0 +1,46 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internalversion + +import ( + restclient "k8s.io/kubernetes/pkg/client/restclient" +) + +// EvictionsGetter has a method to return a EvictionInterface. +// A group's client should implement this interface. +type EvictionsGetter interface { + Evictions(namespace string) EvictionInterface +} + +// EvictionInterface has methods to work with Eviction resources. +type EvictionInterface interface { + EvictionExpansion +} + +// evictions implements EvictionInterface +type evictions struct { + client restclient.Interface + ns string +} + +// newEvictions returns a Evictions +func newEvictions(c *PolicyClient, namespace string) *evictions { + return &evictions{ + client: c.RESTClient(), + ns: namespace, + } +} diff --git a/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/eviction_expansion.go b/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/eviction_expansion.go new file mode 100644 index 0000000000..8e2030101b --- /dev/null +++ b/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/eviction_expansion.go @@ -0,0 +1,38 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internalversion + +import ( + policy "k8s.io/kubernetes/pkg/apis/policy" +) + +// The EvictionExpansion interface allows manually adding extra methods to the ScaleInterface. +type EvictionExpansion interface { + Evict(eviction *policy.Eviction) error +} + +func (c *evictions) Evict(eviction *policy.Eviction) error { + return c.client.Post(). + AbsPath("/api/v1"). + Namespace(eviction.Namespace). + Resource("pods"). + Name(eviction.Name). + SubResource("eviction"). + Body(eviction). + Do(). + Error() +} diff --git a/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/fake/BUILD b/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/fake/BUILD index d7627220e8..38d87c65e6 100644 --- a/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/fake/BUILD +++ b/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/fake/BUILD @@ -14,6 +14,8 @@ go_library( name = "go_default_library", srcs = [ "doc.go", + "fake_eviction.go", + "fake_eviction_expansion.go", "fake_poddisruptionbudget.go", "fake_policy_client.go", ], diff --git a/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/fake/fake_eviction.go b/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/fake/fake_eviction.go new file mode 100644 index 0000000000..70b7ebc56a --- /dev/null +++ b/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/fake/fake_eviction.go @@ -0,0 +1,23 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +// FakeEvictions implements EvictionInterface +type FakeEvictions struct { + Fake *FakePolicy + ns string +} diff --git a/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/fake/fake_eviction_expansion.go b/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/fake/fake_eviction_expansion.go new file mode 100644 index 0000000000..ff6093872f --- /dev/null +++ b/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/fake/fake_eviction_expansion.go @@ -0,0 +1,33 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + unversioned "k8s.io/kubernetes/pkg/api/unversioned" + policy "k8s.io/kubernetes/pkg/apis/policy" + core "k8s.io/kubernetes/pkg/client/testing/core" +) + +func (c *FakeEvictions) Evict(eviction *policy.Eviction) error { + action := core.GetActionImpl{} + action.Verb = "post" + action.Namespace = c.ns + action.Resource = unversioned.GroupVersionResource{Group: "", Version: "", Resource: "pods"} + action.Subresource = "eviction" + _, err := c.Fake.Invokes(action, eviction) + return err +} diff --git a/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/fake/fake_policy_client.go b/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/fake/fake_policy_client.go index 4eba902cfd..c277e61f42 100644 --- a/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/fake/fake_policy_client.go +++ b/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/fake/fake_policy_client.go @@ -26,6 +26,10 @@ type FakePolicy struct { *core.Fake } +func (c *FakePolicy) Evictions(namespace string) internalversion.EvictionInterface { + return &FakeEvictions{c, namespace} +} + func (c *FakePolicy) PodDisruptionBudgets(namespace string) internalversion.PodDisruptionBudgetInterface { return &FakePodDisruptionBudgets{c, namespace} } diff --git a/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/policy_client.go b/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/policy_client.go index a3804959d0..b968424b00 100644 --- a/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/policy_client.go +++ b/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion/policy_client.go @@ -24,6 +24,7 @@ import ( type PolicyInterface interface { RESTClient() restclient.Interface + EvictionsGetter PodDisruptionBudgetsGetter } @@ -32,6 +33,10 @@ type PolicyClient struct { restClient restclient.Interface } +func (c *PolicyClient) Evictions(namespace string) EvictionInterface { + return newEvictions(c, namespace) +} + func (c *PolicyClient) PodDisruptionBudgets(namespace string) PodDisruptionBudgetInterface { return newPodDisruptionBudgets(c, namespace) } diff --git a/pkg/client/listers/policy/internalversion/BUILD b/pkg/client/listers/policy/internalversion/BUILD index 7deeda7765..8d39b8b062 100644 --- a/pkg/client/listers/policy/internalversion/BUILD +++ b/pkg/client/listers/policy/internalversion/BUILD @@ -13,6 +13,7 @@ load( go_library( name = "go_default_library", srcs = [ + "eviction.go", "expansion_generated.go", "poddisruptionbudget.go", ], diff --git a/pkg/client/listers/policy/internalversion/eviction.go b/pkg/client/listers/policy/internalversion/eviction.go new file mode 100644 index 0000000000..fd307f9fb0 --- /dev/null +++ b/pkg/client/listers/policy/internalversion/eviction.go @@ -0,0 +1,94 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This file was automatically generated by lister-gen with arguments: --input-dirs=[k8s.io/kubernetes/pkg/api,k8s.io/kubernetes/pkg/api/v1,k8s.io/kubernetes/pkg/apis/abac,k8s.io/kubernetes/pkg/apis/abac/v0,k8s.io/kubernetes/pkg/apis/abac/v1beta1,k8s.io/kubernetes/pkg/apis/apps,k8s.io/kubernetes/pkg/apis/apps/v1beta1,k8s.io/kubernetes/pkg/apis/authentication,k8s.io/kubernetes/pkg/apis/authentication/v1beta1,k8s.io/kubernetes/pkg/apis/authorization,k8s.io/kubernetes/pkg/apis/authorization/v1beta1,k8s.io/kubernetes/pkg/apis/autoscaling,k8s.io/kubernetes/pkg/apis/autoscaling/v1,k8s.io/kubernetes/pkg/apis/batch,k8s.io/kubernetes/pkg/apis/batch/v1,k8s.io/kubernetes/pkg/apis/batch/v2alpha1,k8s.io/kubernetes/pkg/apis/certificates,k8s.io/kubernetes/pkg/apis/certificates/v1alpha1,k8s.io/kubernetes/pkg/apis/componentconfig,k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1,k8s.io/kubernetes/pkg/apis/extensions,k8s.io/kubernetes/pkg/apis/extensions/v1beta1,k8s.io/kubernetes/pkg/apis/imagepolicy,k8s.io/kubernetes/pkg/apis/imagepolicy/v1alpha1,k8s.io/kubernetes/pkg/apis/policy,k8s.io/kubernetes/pkg/apis/policy/v1alpha1,k8s.io/kubernetes/pkg/apis/policy/v1beta1,k8s.io/kubernetes/pkg/apis/rbac,k8s.io/kubernetes/pkg/apis/rbac/v1alpha1,k8s.io/kubernetes/pkg/apis/storage,k8s.io/kubernetes/pkg/apis/storage/v1beta1] + +package internalversion + +import ( + "k8s.io/kubernetes/pkg/api/errors" + policy "k8s.io/kubernetes/pkg/apis/policy" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/labels" +) + +// EvictionLister helps list Evictions. +type EvictionLister interface { + // List lists all Evictions in the indexer. + List(selector labels.Selector) (ret []*policy.Eviction, err error) + // Evictions returns an object that can list and get Evictions. + Evictions(namespace string) EvictionNamespaceLister + EvictionListerExpansion +} + +// evictionLister implements the EvictionLister interface. +type evictionLister struct { + indexer cache.Indexer +} + +// NewEvictionLister returns a new EvictionLister. +func NewEvictionLister(indexer cache.Indexer) EvictionLister { + return &evictionLister{indexer: indexer} +} + +// List lists all Evictions in the indexer. +func (s *evictionLister) List(selector labels.Selector) (ret []*policy.Eviction, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*policy.Eviction)) + }) + return ret, err +} + +// Evictions returns an object that can list and get Evictions. +func (s *evictionLister) Evictions(namespace string) EvictionNamespaceLister { + return evictionNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// EvictionNamespaceLister helps list and get Evictions. +type EvictionNamespaceLister interface { + // List lists all Evictions in the indexer for a given namespace. + List(selector labels.Selector) (ret []*policy.Eviction, err error) + // Get retrieves the Eviction from the indexer for a given namespace and name. + Get(name string) (*policy.Eviction, error) + EvictionNamespaceListerExpansion +} + +// evictionNamespaceLister implements the EvictionNamespaceLister +// interface. +type evictionNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all Evictions in the indexer for a given namespace. +func (s evictionNamespaceLister) List(selector labels.Selector) (ret []*policy.Eviction, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*policy.Eviction)) + }) + return ret, err +} + +// Get retrieves the Eviction from the indexer for a given namespace and name. +func (s evictionNamespaceLister) Get(name string) (*policy.Eviction, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(policy.Resource("eviction"), name) + } + return obj.(*policy.Eviction), nil +} diff --git a/pkg/client/listers/policy/internalversion/expansion_generated.go b/pkg/client/listers/policy/internalversion/expansion_generated.go index cbf579c93c..3c3fc2262d 100644 --- a/pkg/client/listers/policy/internalversion/expansion_generated.go +++ b/pkg/client/listers/policy/internalversion/expansion_generated.go @@ -18,6 +18,14 @@ limitations under the License. package internalversion +// EvictionListerExpansion allows custom methods to be added to +// EvictionLister. +type EvictionListerExpansion interface{} + +// EvictionNamespaceListerExpansion allows custom methods to be added to +// EvictionNamespaeLister. +type EvictionNamespaceListerExpansion interface{} + // PodDisruptionBudgetListerExpansion allows custom methods to be added to // PodDisruptionBudgetLister. type PodDisruptionBudgetListerExpansion interface{} diff --git a/pkg/kubectl/cmd/BUILD b/pkg/kubectl/cmd/BUILD index b186ab0dae..e0eb83dc3b 100644 --- a/pkg/kubectl/cmd/BUILD +++ b/pkg/kubectl/cmd/BUILD @@ -70,6 +70,7 @@ go_library( "//pkg/apimachinery/registered:go_default_library", "//pkg/apis/batch/v1:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library", + "//pkg/apis/policy:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", "//pkg/client/restclient:go_default_library", @@ -173,6 +174,7 @@ go_test( "//pkg/apimachinery/registered:go_default_library", "//pkg/apis/batch:go_default_library", "//pkg/apis/extensions:go_default_library", + "//pkg/apis/policy:go_default_library", "//pkg/client/restclient:go_default_library", "//pkg/client/restclient/fake:go_default_library", "//pkg/client/typed/dynamic:go_default_library", diff --git a/pkg/kubectl/cmd/cmd.go b/pkg/kubectl/cmd/cmd.go index f5b131a641..4b5bd32e13 100644 --- a/pkg/kubectl/cmd/cmd.go +++ b/pkg/kubectl/cmd/cmd.go @@ -255,7 +255,7 @@ func NewKubectlCommand(f cmdutil.Factory, in io.Reader, out, err io.Writer) *cob NewCmdTop(f, out, err), NewCmdCordon(f, out), NewCmdUncordon(f, out), - NewCmdDrain(f, out), + NewCmdDrain(f, out, err), NewCmdTaint(f, out), }, }, diff --git a/pkg/kubectl/cmd/cmd_test.go b/pkg/kubectl/cmd/cmd_test.go index a0998d8a1d..d5a3bdae56 100644 --- a/pkg/kubectl/cmd/cmd_test.go +++ b/pkg/kubectl/cmd/cmd_test.go @@ -18,6 +18,7 @@ package cmd import ( "bytes" + "encoding/json" "fmt" "io" "io/ioutil" @@ -28,6 +29,7 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/client/restclient" @@ -110,6 +112,14 @@ func objBody(codec runtime.Codec, obj runtime.Object) io.ReadCloser { return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj)))) } +func policyObjBody(obj runtime.Object) io.ReadCloser { + return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(testapi.Policy.Codec(), obj)))) +} + +func bytesBody(bodyBytes []byte) io.ReadCloser { + return ioutil.NopCloser(bytes.NewReader(bodyBytes)) +} + func stringBody(body string) io.ReadCloser { return ioutil.NopCloser(bytes.NewReader([]byte(body))) } @@ -616,3 +626,11 @@ func TestNormalizationFuncGlobalExistence(t *testing.T) { t.Fatal("child and root commands should have the same normalization functions") } } + +func genResponseWithJsonEncodedBody(bodyStruct interface{}) (*http.Response, error) { + jsonBytes, err := json.Marshal(bodyStruct) + if err != nil { + return nil, err + } + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bytesBody(jsonBytes)}, nil +} diff --git a/pkg/kubectl/cmd/drain.go b/pkg/kubectl/cmd/drain.go index e29521b2ff..569a8160f4 100644 --- a/pkg/kubectl/cmd/drain.go +++ b/pkg/kubectl/cmd/drain.go @@ -20,15 +20,19 @@ import ( "errors" "fmt" "io" + "math" "reflect" "strings" "time" + "github.com/jonboulle/clockwork" "github.com/spf13/cobra" "k8s.io/kubernetes/pkg/api" apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/policy" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/fields" @@ -49,10 +53,12 @@ type DrainOptions struct { GracePeriodSeconds int IgnoreDaemonsets bool Timeout time.Duration + backOff clockwork.Clock DeleteLocalData bool mapper meta.RESTMapper nodeInfo *resource.Info out io.Writer + errOut io.Writer typer runtime.ObjectTyper } @@ -67,6 +73,9 @@ type fatal struct { } const ( + EvictionKind = "Eviction" + EvictionSubresource = "pods/eviction" + kDaemonsetFatal = "DaemonSet-managed pods (use --ignore-daemonsets to ignore)" kDaemonsetWarning = "Ignoring DaemonSet-managed pods" kLocalStorageFatal = "pods with local storage (use --delete-local-data to override)" @@ -152,8 +161,8 @@ var ( $ kubectl drain foo --grace-period=900`) ) -func NewCmdDrain(f cmdutil.Factory, out io.Writer) *cobra.Command { - options := &DrainOptions{factory: f, out: out} +func NewCmdDrain(f cmdutil.Factory, out, errOut io.Writer) *cobra.Command { + options := &DrainOptions{factory: f, out: out, errOut: errOut, backOff: clockwork.NewRealClock()} cmd := &cobra.Command{ Use: "drain NODE", @@ -221,16 +230,43 @@ func (o *DrainOptions) RunDrain() error { return err } + err := o.deleteOrEvictPodsSimple() + // TODO: update IsTooManyRequests() when the TooManyRequests(429) error returned from the API server has a non-empty Reason field + for i := 1; i <= maxPatchRetry && apierrors.IsTooManyRequests(err); i++ { + if i > triesBeforeBackOff { + currBackOffPeriod := time.Duration(math.Exp2(float64(i-triesBeforeBackOff))) * backOffPeriod + fmt.Fprintf(o.errOut, "Retry in %v\n", currBackOffPeriod) + o.backOff.Sleep(currBackOffPeriod) + } + fmt.Fprintf(o.errOut, "Retrying\n") + err = o.deleteOrEvictPodsSimple() + } + if err == nil { + cmdutil.PrintSuccess(o.mapper, false, o.out, "node", o.nodeInfo.Name, false, "drained") + } + return err +} + +func (o *DrainOptions) deleteOrEvictPodsSimple() error { pods, err := o.getPodsForDeletion() if err != nil { return err } - - if err = o.deletePods(pods); err != nil { - return err + if o.Timeout == 0 { + o.Timeout = kubectl.Timeout + time.Duration(10*len(pods))*time.Second } - cmdutil.PrintSuccess(o.mapper, false, o.out, "node", o.nodeInfo.Name, false, "drained") - return nil + err = o.deleteOrEvictPods(pods) + if err != nil { + pendingPods, newErr := o.getPodsForDeletion() + if newErr != nil { + return newErr + } + fmt.Fprintf(o.errOut, "There are pending pods when an error occurred: %v\n", err) + for _, pendingPod := range pendingPods { + fmt.Fprintf(o.errOut, "%s/%s\n", "pod", pendingPod.Name) + } + } + return err } func (o *DrainOptions) getController(sr *api.SerializedReference) (interface{}, error) { @@ -243,6 +279,8 @@ func (o *DrainOptions) getController(sr *api.SerializedReference) (interface{}, return o.client.Batch().Jobs(sr.Reference.Namespace).Get(sr.Reference.Name) case "ReplicaSet": return o.client.Extensions().ReplicaSets(sr.Reference.Namespace).Get(sr.Reference.Name) + case "StatefulSet": + return o.client.Apps().StatefulSets(sr.Reference.Namespace).Get(sr.Reference.Name) } return nil, fmt.Errorf("Unknown controller kind %q", sr.Reference.Kind) } @@ -252,7 +290,6 @@ func (o *DrainOptions) getPodCreator(pod api.Pod) (*api.SerializedReference, err if !found { return nil, nil } - // Now verify that the specified creator actually exists. sr := &api.SerializedReference{} if err := runtime.DecodeInto(o.factory.Decoder(true), []byte(creatorRef), sr); err != nil { @@ -380,21 +417,58 @@ func (o *DrainOptions) getPodsForDeletion() (pods []api.Pod, err error) { return []api.Pod{}, errors.New(fs.Message()) } if len(ws) > 0 { - fmt.Fprintf(o.out, "WARNING: %s\n", ws.Message()) + fmt.Fprintf(o.errOut, "WARNING: %s\n", ws.Message()) } return pods, nil } -// deletePods deletes the pods on the api server -func (o *DrainOptions) deletePods(pods []api.Pod) error { - deleteOptions := api.DeleteOptions{} +func (o *DrainOptions) deletePod(pod api.Pod) error { + deleteOptions := &api.DeleteOptions{} if o.GracePeriodSeconds >= 0 { gracePeriodSeconds := int64(o.GracePeriodSeconds) deleteOptions.GracePeriodSeconds = &gracePeriodSeconds } + return o.client.Core().Pods(pod.Namespace).Delete(pod.Name, deleteOptions) +} + +func (o *DrainOptions) evictPod(pod api.Pod, policyGroupVersion string) error { + deleteOptions := &api.DeleteOptions{} + if o.GracePeriodSeconds >= 0 { + gracePeriodSeconds := int64(o.GracePeriodSeconds) + deleteOptions.GracePeriodSeconds = &gracePeriodSeconds + } + eviction := &policy.Eviction{ + TypeMeta: unversioned.TypeMeta{ + APIVersion: policyGroupVersion, + Kind: EvictionKind, + }, + ObjectMeta: api.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + DeleteOptions: deleteOptions, + } + // Remember to change change the URL manipulation func when Evction's version change + return o.client.Policy().Evictions(eviction.Namespace).Evict(eviction) +} + +// deleteOrEvictPods deletes or evicts the pods on the api server +func (o *DrainOptions) deleteOrEvictPods(pods []api.Pod) error { + if len(pods) == 0 { + return nil + } + + policyGroupVersion, err := SupportEviction(o.client) + if err != nil { + return err + } for _, pod := range pods { - err := o.client.Core().Pods(pod.Namespace).Delete(pod.Name, &deleteOptions) + if len(policyGroupVersion) > 0 { + err = o.evictPod(pod, policyGroupVersion) + } else { + err = o.deletePod(pod) + } if err != nil { return err } @@ -403,17 +477,11 @@ func (o *DrainOptions) deletePods(pods []api.Pod) error { getPodFn := func(namespace, name string) (*api.Pod, error) { return o.client.Core().Pods(namespace).Get(name) } - pendingPods, err := o.waitForDelete(pods, kubectl.Interval, o.Timeout, getPodFn) - if err != nil { - fmt.Fprintf(o.out, "There are pending pods when an error occured:\n") - for _, pendindPod := range pendingPods { - cmdutil.PrintSuccess(o.mapper, true, o.out, "pod", pendindPod.Name, false, "") - } - } + _, err = o.waitForDelete(pods, kubectl.Interval, o.Timeout, getPodFn) return err } -func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, getPodFn func(namespace, name string) (*api.Pod, error)) ([]api.Pod, error) { +func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, getPodFn func(string, string) (*api.Pod, error)) ([]api.Pod, error) { err := wait.PollImmediate(interval, timeout, func() (bool, error) { pendingPods := []api.Pod{} for i, pod := range pods { @@ -436,6 +504,38 @@ func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Dura return pods, err } +// SupportEviction uses Discovery API to find out if the server support eviction subresource +// If support, it will return its groupVersion; Otherwise, it will return "" +func SupportEviction(clientset *internalclientset.Clientset) (string, error) { + discoveryClient := clientset.Discovery() + groupList, err := discoveryClient.ServerGroups() + if err != nil { + return "", err + } + foundPolicyGroup := false + var policyGroupVersion string + for _, group := range groupList.Groups { + if group.Name == "policy" { + foundPolicyGroup = true + policyGroupVersion = group.PreferredVersion.GroupVersion + break + } + } + if !foundPolicyGroup { + return "", nil + } + resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1") + if err != nil { + return "", err + } + for _, resource := range resourceList.APIResources { + if resource.Name == EvictionSubresource && resource.Kind == EvictionKind { + return policyGroupVersion, nil + } + } + return "", nil +} + // RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for // "Unschedulable" is passed as the first arg. func (o *DrainOptions) RunCordonOrUncordon(desired bool) error { diff --git a/pkg/kubectl/cmd/drain_test.go b/pkg/kubectl/cmd/drain_test.go index 698fd67e33..57ad52784d 100644 --- a/pkg/kubectl/cmd/drain_test.go +++ b/pkg/kubectl/cmd/drain_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/apis/policy" "k8s.io/kubernetes/pkg/client/restclient/fake" "k8s.io/kubernetes/pkg/conversion" cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing" @@ -48,6 +49,11 @@ import ( "k8s.io/kubernetes/pkg/util/wait" ) +const ( + EvictionMethod = "Eviction" + DeleteMethod = "Delete" +) + var node *api.Node var cordoned_node *api.Node @@ -463,95 +469,144 @@ func TestDrain(t *testing.T) { }, } - for _, test := range tests { - new_node := &api.Node{} - deleted := false - f, tf, codec, ns := cmdtesting.NewAPIFactory() - - tf.Client = &fake.RESTClient{ - NegotiatedSerializer: ns, - Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { - m := &MyReq{req} - switch { - case m.isFor("GET", "/nodes/node"): - return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, test.node)}, nil - case m.isFor("GET", "/namespaces/default/replicationcontrollers/rc"): - return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &test.rcs[0])}, nil - case m.isFor("GET", "/namespaces/default/daemonsets/ds"): - return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &ds)}, nil - case m.isFor("GET", "/namespaces/default/jobs/job"): - return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &job)}, nil - case m.isFor("GET", "/namespaces/default/replicasets/rs"): - return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &test.replicaSets[0])}, nil - case m.isFor("GET", "/namespaces/default/pods/bar"): - return &http.Response{StatusCode: 404, Header: defaultHeader(), Body: objBody(codec, nil)}, nil - case m.isFor("GET", "/pods"): - values, err := url.ParseQuery(req.URL.RawQuery) - if err != nil { - t.Fatalf("%s: unexpected error: %v", test.description, err) - } - get_params := make(url.Values) - get_params["fieldSelector"] = []string{"spec.nodeName=node"} - if !reflect.DeepEqual(get_params, values) { - t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, get_params, values) - } - return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.PodList{Items: test.pods})}, nil - case m.isFor("GET", "/replicationcontrollers"): - return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.ReplicationControllerList{Items: test.rcs})}, nil - case m.isFor("PUT", "/nodes/node"): - data, err := ioutil.ReadAll(req.Body) - if err != nil { - t.Fatalf("%s: unexpected error: %v", test.description, err) - } - defer req.Body.Close() - if err := runtime.DecodeInto(codec, data, new_node); err != nil { - t.Fatalf("%s: unexpected error: %v", test.description, err) - } - if !reflect.DeepEqual(test.expected.Spec, new_node.Spec) { - t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, test.expected.Spec, new_node.Spec) - } - return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, new_node)}, nil - case m.isFor("DELETE", "/namespaces/default/pods/bar"): - deleted = true - return &http.Response{StatusCode: 204, Header: defaultHeader(), Body: objBody(codec, &test.pods[0])}, nil - default: - t.Fatalf("%s: unexpected request: %v %#v\n%#v", test.description, req.Method, req.URL, req) - return nil, nil - } - }), + testEviction := false + for i := 0; i < 2; i++ { + testEviction = !testEviction + var currMethod string + if testEviction { + currMethod = EvictionMethod + } else { + currMethod = DeleteMethod } - tf.ClientConfig = defaultClientConfig() + for _, test := range tests { + new_node := &api.Node{} + deleted := false + evicted := false + f, tf, codec, ns := cmdtesting.NewAPIFactory() + tf.Client = &fake.RESTClient{ + NegotiatedSerializer: ns, + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + m := &MyReq{req} + switch { + case req.Method == "GET" && req.URL.Path == "/api": + apiVersions := unversioned.APIVersions{ + Versions: []string{"v1"}, + } + return genResponseWithJsonEncodedBody(apiVersions) + case req.Method == "GET" && req.URL.Path == "/apis": + groupList := unversioned.APIGroupList{ + Groups: []unversioned.APIGroup{ + { + Name: "policy", + PreferredVersion: unversioned.GroupVersionForDiscovery{ + GroupVersion: "policy/v1beta1", + }, + }, + }, + } + return genResponseWithJsonEncodedBody(groupList) + case req.Method == "GET" && req.URL.Path == "/api/v1": + resourceList := unversioned.APIResourceList{ + GroupVersion: "v1", + } + if testEviction { + resourceList.APIResources = []unversioned.APIResource{ + { + Name: EvictionSubresource, + Kind: EvictionKind, + }, + } + } + return genResponseWithJsonEncodedBody(resourceList) + case m.isFor("GET", "/nodes/node"): + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, test.node)}, nil + case m.isFor("GET", "/namespaces/default/replicationcontrollers/rc"): + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &test.rcs[0])}, nil + case m.isFor("GET", "/namespaces/default/daemonsets/ds"): + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &ds)}, nil + case m.isFor("GET", "/namespaces/default/jobs/job"): + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &job)}, nil + case m.isFor("GET", "/namespaces/default/replicasets/rs"): + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &test.replicaSets[0])}, nil + case m.isFor("GET", "/namespaces/default/pods/bar"): + return &http.Response{StatusCode: 404, Header: defaultHeader(), Body: objBody(codec, &api.Pod{})}, nil + case m.isFor("GET", "/pods"): + values, err := url.ParseQuery(req.URL.RawQuery) + if err != nil { + t.Fatalf("%s: unexpected error: %v", test.description, err) + } + get_params := make(url.Values) + get_params["fieldSelector"] = []string{"spec.nodeName=node"} + if !reflect.DeepEqual(get_params, values) { + t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, get_params, values) + } + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.PodList{Items: test.pods})}, nil + case m.isFor("GET", "/replicationcontrollers"): + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.ReplicationControllerList{Items: test.rcs})}, nil + case m.isFor("PUT", "/nodes/node"): + data, err := ioutil.ReadAll(req.Body) + if err != nil { + t.Fatalf("%s: unexpected error: %v", test.description, err) + } + defer req.Body.Close() + if err := runtime.DecodeInto(codec, data, new_node); err != nil { + t.Fatalf("%s: unexpected error: %v", test.description, err) + } + if !reflect.DeepEqual(test.expected.Spec, new_node.Spec) { + t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, test.expected.Spec, new_node.Spec) + } + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, new_node)}, nil + case m.isFor("DELETE", "/namespaces/default/pods/bar"): + deleted = true + return &http.Response{StatusCode: 204, Header: defaultHeader(), Body: objBody(codec, &test.pods[0])}, nil + case m.isFor("POST", "/namespaces/default/pods/bar/eviction"): + evicted = true + return &http.Response{StatusCode: 201, Header: defaultHeader(), Body: policyObjBody(&policy.Eviction{})}, nil + default: + t.Fatalf("%s: unexpected request: %v %#v\n%#v", test.description, req.Method, req.URL, req) + return nil, nil + } + }), + } + tf.ClientConfig = defaultClientConfig() - buf := bytes.NewBuffer([]byte{}) - cmd := NewCmdDrain(f, buf) + buf := bytes.NewBuffer([]byte{}) + errBuf := bytes.NewBuffer([]byte{}) + cmd := NewCmdDrain(f, buf, errBuf) - saw_fatal := false - func() { - defer func() { - // Recover from the panic below. - _ = recover() - // Restore cmdutil behavior - cmdutil.DefaultBehaviorOnFatal() + saw_fatal := false + func() { + defer func() { + // Recover from the panic below. + _ = recover() + // Restore cmdutil behavior + cmdutil.DefaultBehaviorOnFatal() + }() + cmdutil.BehaviorOnFatal(func(e string, code int) { saw_fatal = true; panic(e) }) + cmd.SetArgs(test.args) + cmd.Execute() }() - cmdutil.BehaviorOnFatal(func(e string, code int) { saw_fatal = true; panic(e) }) - cmd.SetArgs(test.args) - cmd.Execute() - }() - if test.expectFatal { - if !saw_fatal { - t.Fatalf("%s: unexpected non-error", test.description) + if test.expectFatal { + if !saw_fatal { + t.Fatalf("%s: unexpected non-error when using %s", test.description, currMethod) + } } - } - if test.expectDelete { - if !deleted { - t.Fatalf("%s: pod never deleted", test.description) + if test.expectDelete { + // Test Delete + if !testEviction && !deleted { + t.Fatalf("%s: pod never deleted", test.description) + } + // Test Eviction + if testEviction && !evicted { + t.Fatalf("%s: pod never evicted", test.description) + } } - } - if !test.expectDelete { - if deleted { - t.Fatalf("%s: unexpected delete", test.description) + if !test.expectDelete { + if deleted { + t.Fatalf("%s: unexpected delete when using %s", test.description, currMethod) + } } } }