mirror of https://github.com/k3s-io/k3s
Fix kubectl drain for statefulset and use eviciton for drain if possible
parent
0c7421fb51
commit
b73fae6c55
|
@ -413,6 +413,17 @@ func IsInternalError(err error) bool {
|
|||
return reasonForError(err) == unversioned.StatusReasonInternalError
|
||||
}
|
||||
|
||||
// IsTooManyRequests determines if err is an error which indicates that there are too many requests
|
||||
// that the server cannot handle.
|
||||
// TODO: update IsTooManyRequests() when the TooManyRequests(429) error returned from the API server has a non-empty Reason field
|
||||
func IsTooManyRequests(err error) bool {
|
||||
switch t := err.(type) {
|
||||
case APIStatus:
|
||||
return t.Status().Code == StatusTooManyRequests
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// IsUnexpectedServerError returns true if the server response was not in the expected API format,
|
||||
// and may be the result of another HTTP actor.
|
||||
func IsUnexpectedServerError(err error) bool {
|
||||
|
|
|
@ -90,6 +90,9 @@ type PodDisruptionBudgetList struct {
|
|||
Items []PodDisruptionBudget `json:"items"`
|
||||
}
|
||||
|
||||
// +genclient=true
|
||||
// +noMethods=true
|
||||
|
||||
// Eviction evicts a pod from its node subject to certain policies and safety constraints.
|
||||
// This is a subresource of Pod. A request to cause such an eviction is
|
||||
// created by POSTing to .../pods/<pod name>/eviction.
|
||||
|
|
|
@ -14,6 +14,8 @@ go_library(
|
|||
name = "go_default_library",
|
||||
srcs = [
|
||||
"doc.go",
|
||||
"eviction.go",
|
||||
"eviction_expansion.go",
|
||||
"generated_expansion.go",
|
||||
"poddisruptionbudget.go",
|
||||
"policy_client.go",
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package internalversion
|
||||
|
||||
import (
|
||||
restclient "k8s.io/kubernetes/pkg/client/restclient"
|
||||
)
|
||||
|
||||
// EvictionsGetter has a method to return a EvictionInterface.
|
||||
// A group's client should implement this interface.
|
||||
type EvictionsGetter interface {
|
||||
Evictions(namespace string) EvictionInterface
|
||||
}
|
||||
|
||||
// EvictionInterface has methods to work with Eviction resources.
|
||||
type EvictionInterface interface {
|
||||
EvictionExpansion
|
||||
}
|
||||
|
||||
// evictions implements EvictionInterface
|
||||
type evictions struct {
|
||||
client restclient.Interface
|
||||
ns string
|
||||
}
|
||||
|
||||
// newEvictions returns a Evictions
|
||||
func newEvictions(c *PolicyClient, namespace string) *evictions {
|
||||
return &evictions{
|
||||
client: c.RESTClient(),
|
||||
ns: namespace,
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package internalversion
|
||||
|
||||
import (
|
||||
policy "k8s.io/kubernetes/pkg/apis/policy"
|
||||
)
|
||||
|
||||
// The EvictionExpansion interface allows manually adding extra methods to the ScaleInterface.
|
||||
type EvictionExpansion interface {
|
||||
Evict(eviction *policy.Eviction) error
|
||||
}
|
||||
|
||||
func (c *evictions) Evict(eviction *policy.Eviction) error {
|
||||
return c.client.Post().
|
||||
AbsPath("/api/v1").
|
||||
Namespace(eviction.Namespace).
|
||||
Resource("pods").
|
||||
Name(eviction.Name).
|
||||
SubResource("eviction").
|
||||
Body(eviction).
|
||||
Do().
|
||||
Error()
|
||||
}
|
|
@ -14,6 +14,8 @@ go_library(
|
|||
name = "go_default_library",
|
||||
srcs = [
|
||||
"doc.go",
|
||||
"fake_eviction.go",
|
||||
"fake_eviction_expansion.go",
|
||||
"fake_poddisruptionbudget.go",
|
||||
"fake_policy_client.go",
|
||||
],
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package fake
|
||||
|
||||
// FakeEvictions implements EvictionInterface
|
||||
type FakeEvictions struct {
|
||||
Fake *FakePolicy
|
||||
ns string
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package fake
|
||||
|
||||
import (
|
||||
unversioned "k8s.io/kubernetes/pkg/api/unversioned"
|
||||
policy "k8s.io/kubernetes/pkg/apis/policy"
|
||||
core "k8s.io/kubernetes/pkg/client/testing/core"
|
||||
)
|
||||
|
||||
func (c *FakeEvictions) Evict(eviction *policy.Eviction) error {
|
||||
action := core.GetActionImpl{}
|
||||
action.Verb = "post"
|
||||
action.Namespace = c.ns
|
||||
action.Resource = unversioned.GroupVersionResource{Group: "", Version: "", Resource: "pods"}
|
||||
action.Subresource = "eviction"
|
||||
_, err := c.Fake.Invokes(action, eviction)
|
||||
return err
|
||||
}
|
|
@ -26,6 +26,10 @@ type FakePolicy struct {
|
|||
*core.Fake
|
||||
}
|
||||
|
||||
func (c *FakePolicy) Evictions(namespace string) internalversion.EvictionInterface {
|
||||
return &FakeEvictions{c, namespace}
|
||||
}
|
||||
|
||||
func (c *FakePolicy) PodDisruptionBudgets(namespace string) internalversion.PodDisruptionBudgetInterface {
|
||||
return &FakePodDisruptionBudgets{c, namespace}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
|
||||
type PolicyInterface interface {
|
||||
RESTClient() restclient.Interface
|
||||
EvictionsGetter
|
||||
PodDisruptionBudgetsGetter
|
||||
}
|
||||
|
||||
|
@ -32,6 +33,10 @@ type PolicyClient struct {
|
|||
restClient restclient.Interface
|
||||
}
|
||||
|
||||
func (c *PolicyClient) Evictions(namespace string) EvictionInterface {
|
||||
return newEvictions(c, namespace)
|
||||
}
|
||||
|
||||
func (c *PolicyClient) PodDisruptionBudgets(namespace string) PodDisruptionBudgetInterface {
|
||||
return newPodDisruptionBudgets(c, namespace)
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ load(
|
|||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"eviction.go",
|
||||
"expansion_generated.go",
|
||||
"poddisruptionbudget.go",
|
||||
],
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// This file was automatically generated by lister-gen with arguments: --input-dirs=[k8s.io/kubernetes/pkg/api,k8s.io/kubernetes/pkg/api/v1,k8s.io/kubernetes/pkg/apis/abac,k8s.io/kubernetes/pkg/apis/abac/v0,k8s.io/kubernetes/pkg/apis/abac/v1beta1,k8s.io/kubernetes/pkg/apis/apps,k8s.io/kubernetes/pkg/apis/apps/v1beta1,k8s.io/kubernetes/pkg/apis/authentication,k8s.io/kubernetes/pkg/apis/authentication/v1beta1,k8s.io/kubernetes/pkg/apis/authorization,k8s.io/kubernetes/pkg/apis/authorization/v1beta1,k8s.io/kubernetes/pkg/apis/autoscaling,k8s.io/kubernetes/pkg/apis/autoscaling/v1,k8s.io/kubernetes/pkg/apis/batch,k8s.io/kubernetes/pkg/apis/batch/v1,k8s.io/kubernetes/pkg/apis/batch/v2alpha1,k8s.io/kubernetes/pkg/apis/certificates,k8s.io/kubernetes/pkg/apis/certificates/v1alpha1,k8s.io/kubernetes/pkg/apis/componentconfig,k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1,k8s.io/kubernetes/pkg/apis/extensions,k8s.io/kubernetes/pkg/apis/extensions/v1beta1,k8s.io/kubernetes/pkg/apis/imagepolicy,k8s.io/kubernetes/pkg/apis/imagepolicy/v1alpha1,k8s.io/kubernetes/pkg/apis/policy,k8s.io/kubernetes/pkg/apis/policy/v1alpha1,k8s.io/kubernetes/pkg/apis/policy/v1beta1,k8s.io/kubernetes/pkg/apis/rbac,k8s.io/kubernetes/pkg/apis/rbac/v1alpha1,k8s.io/kubernetes/pkg/apis/storage,k8s.io/kubernetes/pkg/apis/storage/v1beta1]
|
||||
|
||||
package internalversion
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
policy "k8s.io/kubernetes/pkg/apis/policy"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
)
|
||||
|
||||
// EvictionLister helps list Evictions.
|
||||
type EvictionLister interface {
|
||||
// List lists all Evictions in the indexer.
|
||||
List(selector labels.Selector) (ret []*policy.Eviction, err error)
|
||||
// Evictions returns an object that can list and get Evictions.
|
||||
Evictions(namespace string) EvictionNamespaceLister
|
||||
EvictionListerExpansion
|
||||
}
|
||||
|
||||
// evictionLister implements the EvictionLister interface.
|
||||
type evictionLister struct {
|
||||
indexer cache.Indexer
|
||||
}
|
||||
|
||||
// NewEvictionLister returns a new EvictionLister.
|
||||
func NewEvictionLister(indexer cache.Indexer) EvictionLister {
|
||||
return &evictionLister{indexer: indexer}
|
||||
}
|
||||
|
||||
// List lists all Evictions in the indexer.
|
||||
func (s *evictionLister) List(selector labels.Selector) (ret []*policy.Eviction, err error) {
|
||||
err = cache.ListAll(s.indexer, selector, func(m interface{}) {
|
||||
ret = append(ret, m.(*policy.Eviction))
|
||||
})
|
||||
return ret, err
|
||||
}
|
||||
|
||||
// Evictions returns an object that can list and get Evictions.
|
||||
func (s *evictionLister) Evictions(namespace string) EvictionNamespaceLister {
|
||||
return evictionNamespaceLister{indexer: s.indexer, namespace: namespace}
|
||||
}
|
||||
|
||||
// EvictionNamespaceLister helps list and get Evictions.
|
||||
type EvictionNamespaceLister interface {
|
||||
// List lists all Evictions in the indexer for a given namespace.
|
||||
List(selector labels.Selector) (ret []*policy.Eviction, err error)
|
||||
// Get retrieves the Eviction from the indexer for a given namespace and name.
|
||||
Get(name string) (*policy.Eviction, error)
|
||||
EvictionNamespaceListerExpansion
|
||||
}
|
||||
|
||||
// evictionNamespaceLister implements the EvictionNamespaceLister
|
||||
// interface.
|
||||
type evictionNamespaceLister struct {
|
||||
indexer cache.Indexer
|
||||
namespace string
|
||||
}
|
||||
|
||||
// List lists all Evictions in the indexer for a given namespace.
|
||||
func (s evictionNamespaceLister) List(selector labels.Selector) (ret []*policy.Eviction, err error) {
|
||||
err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
|
||||
ret = append(ret, m.(*policy.Eviction))
|
||||
})
|
||||
return ret, err
|
||||
}
|
||||
|
||||
// Get retrieves the Eviction from the indexer for a given namespace and name.
|
||||
func (s evictionNamespaceLister) Get(name string) (*policy.Eviction, error) {
|
||||
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, errors.NewNotFound(policy.Resource("eviction"), name)
|
||||
}
|
||||
return obj.(*policy.Eviction), nil
|
||||
}
|
|
@ -18,6 +18,14 @@ limitations under the License.
|
|||
|
||||
package internalversion
|
||||
|
||||
// EvictionListerExpansion allows custom methods to be added to
|
||||
// EvictionLister.
|
||||
type EvictionListerExpansion interface{}
|
||||
|
||||
// EvictionNamespaceListerExpansion allows custom methods to be added to
|
||||
// EvictionNamespaeLister.
|
||||
type EvictionNamespaceListerExpansion interface{}
|
||||
|
||||
// PodDisruptionBudgetListerExpansion allows custom methods to be added to
|
||||
// PodDisruptionBudgetLister.
|
||||
type PodDisruptionBudgetListerExpansion interface{}
|
||||
|
|
|
@ -70,6 +70,7 @@ go_library(
|
|||
"//pkg/apimachinery/registered:go_default_library",
|
||||
"//pkg/apis/batch/v1:go_default_library",
|
||||
"//pkg/apis/extensions/v1beta1:go_default_library",
|
||||
"//pkg/apis/policy:go_default_library",
|
||||
"//pkg/client/clientset_generated/internalclientset:go_default_library",
|
||||
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
|
||||
"//pkg/client/restclient:go_default_library",
|
||||
|
@ -173,6 +174,7 @@ go_test(
|
|||
"//pkg/apimachinery/registered:go_default_library",
|
||||
"//pkg/apis/batch:go_default_library",
|
||||
"//pkg/apis/extensions:go_default_library",
|
||||
"//pkg/apis/policy:go_default_library",
|
||||
"//pkg/client/restclient:go_default_library",
|
||||
"//pkg/client/restclient/fake:go_default_library",
|
||||
"//pkg/client/typed/dynamic:go_default_library",
|
||||
|
|
|
@ -255,7 +255,7 @@ func NewKubectlCommand(f cmdutil.Factory, in io.Reader, out, err io.Writer) *cob
|
|||
NewCmdTop(f, out, err),
|
||||
NewCmdCordon(f, out),
|
||||
NewCmdUncordon(f, out),
|
||||
NewCmdDrain(f, out),
|
||||
NewCmdDrain(f, out, err),
|
||||
NewCmdTaint(f, out),
|
||||
},
|
||||
},
|
||||
|
|
|
@ -18,6 +18,7 @@ package cmd
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
@ -28,6 +29,7 @@ import (
|
|||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
|
@ -110,6 +112,14 @@ func objBody(codec runtime.Codec, obj runtime.Object) io.ReadCloser {
|
|||
return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj))))
|
||||
}
|
||||
|
||||
func policyObjBody(obj runtime.Object) io.ReadCloser {
|
||||
return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(testapi.Policy.Codec(), obj))))
|
||||
}
|
||||
|
||||
func bytesBody(bodyBytes []byte) io.ReadCloser {
|
||||
return ioutil.NopCloser(bytes.NewReader(bodyBytes))
|
||||
}
|
||||
|
||||
func stringBody(body string) io.ReadCloser {
|
||||
return ioutil.NopCloser(bytes.NewReader([]byte(body)))
|
||||
}
|
||||
|
@ -616,3 +626,11 @@ func TestNormalizationFuncGlobalExistence(t *testing.T) {
|
|||
t.Fatal("child and root commands should have the same normalization functions")
|
||||
}
|
||||
}
|
||||
|
||||
func genResponseWithJsonEncodedBody(bodyStruct interface{}) (*http.Response, error) {
|
||||
jsonBytes, err := json.Marshal(bodyStruct)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bytesBody(jsonBytes)}, nil
|
||||
}
|
||||
|
|
|
@ -20,15 +20,19 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jonboulle/clockwork"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
apierrors "k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/policy"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
|
@ -49,10 +53,12 @@ type DrainOptions struct {
|
|||
GracePeriodSeconds int
|
||||
IgnoreDaemonsets bool
|
||||
Timeout time.Duration
|
||||
backOff clockwork.Clock
|
||||
DeleteLocalData bool
|
||||
mapper meta.RESTMapper
|
||||
nodeInfo *resource.Info
|
||||
out io.Writer
|
||||
errOut io.Writer
|
||||
typer runtime.ObjectTyper
|
||||
}
|
||||
|
||||
|
@ -67,6 +73,9 @@ type fatal struct {
|
|||
}
|
||||
|
||||
const (
|
||||
EvictionKind = "Eviction"
|
||||
EvictionSubresource = "pods/eviction"
|
||||
|
||||
kDaemonsetFatal = "DaemonSet-managed pods (use --ignore-daemonsets to ignore)"
|
||||
kDaemonsetWarning = "Ignoring DaemonSet-managed pods"
|
||||
kLocalStorageFatal = "pods with local storage (use --delete-local-data to override)"
|
||||
|
@ -152,8 +161,8 @@ var (
|
|||
$ kubectl drain foo --grace-period=900`)
|
||||
)
|
||||
|
||||
func NewCmdDrain(f cmdutil.Factory, out io.Writer) *cobra.Command {
|
||||
options := &DrainOptions{factory: f, out: out}
|
||||
func NewCmdDrain(f cmdutil.Factory, out, errOut io.Writer) *cobra.Command {
|
||||
options := &DrainOptions{factory: f, out: out, errOut: errOut, backOff: clockwork.NewRealClock()}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "drain NODE",
|
||||
|
@ -221,16 +230,43 @@ func (o *DrainOptions) RunDrain() error {
|
|||
return err
|
||||
}
|
||||
|
||||
err := o.deleteOrEvictPodsSimple()
|
||||
// TODO: update IsTooManyRequests() when the TooManyRequests(429) error returned from the API server has a non-empty Reason field
|
||||
for i := 1; i <= maxPatchRetry && apierrors.IsTooManyRequests(err); i++ {
|
||||
if i > triesBeforeBackOff {
|
||||
currBackOffPeriod := time.Duration(math.Exp2(float64(i-triesBeforeBackOff))) * backOffPeriod
|
||||
fmt.Fprintf(o.errOut, "Retry in %v\n", currBackOffPeriod)
|
||||
o.backOff.Sleep(currBackOffPeriod)
|
||||
}
|
||||
fmt.Fprintf(o.errOut, "Retrying\n")
|
||||
err = o.deleteOrEvictPodsSimple()
|
||||
}
|
||||
if err == nil {
|
||||
cmdutil.PrintSuccess(o.mapper, false, o.out, "node", o.nodeInfo.Name, false, "drained")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (o *DrainOptions) deleteOrEvictPodsSimple() error {
|
||||
pods, err := o.getPodsForDeletion()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = o.deletePods(pods); err != nil {
|
||||
return err
|
||||
if o.Timeout == 0 {
|
||||
o.Timeout = kubectl.Timeout + time.Duration(10*len(pods))*time.Second
|
||||
}
|
||||
cmdutil.PrintSuccess(o.mapper, false, o.out, "node", o.nodeInfo.Name, false, "drained")
|
||||
return nil
|
||||
err = o.deleteOrEvictPods(pods)
|
||||
if err != nil {
|
||||
pendingPods, newErr := o.getPodsForDeletion()
|
||||
if newErr != nil {
|
||||
return newErr
|
||||
}
|
||||
fmt.Fprintf(o.errOut, "There are pending pods when an error occurred: %v\n", err)
|
||||
for _, pendingPod := range pendingPods {
|
||||
fmt.Fprintf(o.errOut, "%s/%s\n", "pod", pendingPod.Name)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (o *DrainOptions) getController(sr *api.SerializedReference) (interface{}, error) {
|
||||
|
@ -243,6 +279,8 @@ func (o *DrainOptions) getController(sr *api.SerializedReference) (interface{},
|
|||
return o.client.Batch().Jobs(sr.Reference.Namespace).Get(sr.Reference.Name)
|
||||
case "ReplicaSet":
|
||||
return o.client.Extensions().ReplicaSets(sr.Reference.Namespace).Get(sr.Reference.Name)
|
||||
case "StatefulSet":
|
||||
return o.client.Apps().StatefulSets(sr.Reference.Namespace).Get(sr.Reference.Name)
|
||||
}
|
||||
return nil, fmt.Errorf("Unknown controller kind %q", sr.Reference.Kind)
|
||||
}
|
||||
|
@ -252,7 +290,6 @@ func (o *DrainOptions) getPodCreator(pod api.Pod) (*api.SerializedReference, err
|
|||
if !found {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Now verify that the specified creator actually exists.
|
||||
sr := &api.SerializedReference{}
|
||||
if err := runtime.DecodeInto(o.factory.Decoder(true), []byte(creatorRef), sr); err != nil {
|
||||
|
@ -380,21 +417,58 @@ func (o *DrainOptions) getPodsForDeletion() (pods []api.Pod, err error) {
|
|||
return []api.Pod{}, errors.New(fs.Message())
|
||||
}
|
||||
if len(ws) > 0 {
|
||||
fmt.Fprintf(o.out, "WARNING: %s\n", ws.Message())
|
||||
fmt.Fprintf(o.errOut, "WARNING: %s\n", ws.Message())
|
||||
}
|
||||
return pods, nil
|
||||
}
|
||||
|
||||
// deletePods deletes the pods on the api server
|
||||
func (o *DrainOptions) deletePods(pods []api.Pod) error {
|
||||
deleteOptions := api.DeleteOptions{}
|
||||
func (o *DrainOptions) deletePod(pod api.Pod) error {
|
||||
deleteOptions := &api.DeleteOptions{}
|
||||
if o.GracePeriodSeconds >= 0 {
|
||||
gracePeriodSeconds := int64(o.GracePeriodSeconds)
|
||||
deleteOptions.GracePeriodSeconds = &gracePeriodSeconds
|
||||
}
|
||||
return o.client.Core().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
|
||||
}
|
||||
|
||||
func (o *DrainOptions) evictPod(pod api.Pod, policyGroupVersion string) error {
|
||||
deleteOptions := &api.DeleteOptions{}
|
||||
if o.GracePeriodSeconds >= 0 {
|
||||
gracePeriodSeconds := int64(o.GracePeriodSeconds)
|
||||
deleteOptions.GracePeriodSeconds = &gracePeriodSeconds
|
||||
}
|
||||
eviction := &policy.Eviction{
|
||||
TypeMeta: unversioned.TypeMeta{
|
||||
APIVersion: policyGroupVersion,
|
||||
Kind: EvictionKind,
|
||||
},
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: pod.Name,
|
||||
Namespace: pod.Namespace,
|
||||
},
|
||||
DeleteOptions: deleteOptions,
|
||||
}
|
||||
// Remember to change change the URL manipulation func when Evction's version change
|
||||
return o.client.Policy().Evictions(eviction.Namespace).Evict(eviction)
|
||||
}
|
||||
|
||||
// deleteOrEvictPods deletes or evicts the pods on the api server
|
||||
func (o *DrainOptions) deleteOrEvictPods(pods []api.Pod) error {
|
||||
if len(pods) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
policyGroupVersion, err := SupportEviction(o.client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, pod := range pods {
|
||||
err := o.client.Core().Pods(pod.Namespace).Delete(pod.Name, &deleteOptions)
|
||||
if len(policyGroupVersion) > 0 {
|
||||
err = o.evictPod(pod, policyGroupVersion)
|
||||
} else {
|
||||
err = o.deletePod(pod)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -403,17 +477,11 @@ func (o *DrainOptions) deletePods(pods []api.Pod) error {
|
|||
getPodFn := func(namespace, name string) (*api.Pod, error) {
|
||||
return o.client.Core().Pods(namespace).Get(name)
|
||||
}
|
||||
pendingPods, err := o.waitForDelete(pods, kubectl.Interval, o.Timeout, getPodFn)
|
||||
if err != nil {
|
||||
fmt.Fprintf(o.out, "There are pending pods when an error occured:\n")
|
||||
for _, pendindPod := range pendingPods {
|
||||
cmdutil.PrintSuccess(o.mapper, true, o.out, "pod", pendindPod.Name, false, "")
|
||||
}
|
||||
}
|
||||
_, err = o.waitForDelete(pods, kubectl.Interval, o.Timeout, getPodFn)
|
||||
return err
|
||||
}
|
||||
|
||||
func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, getPodFn func(namespace, name string) (*api.Pod, error)) ([]api.Pod, error) {
|
||||
func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, getPodFn func(string, string) (*api.Pod, error)) ([]api.Pod, error) {
|
||||
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
|
||||
pendingPods := []api.Pod{}
|
||||
for i, pod := range pods {
|
||||
|
@ -436,6 +504,38 @@ func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Dura
|
|||
return pods, err
|
||||
}
|
||||
|
||||
// SupportEviction uses Discovery API to find out if the server support eviction subresource
|
||||
// If support, it will return its groupVersion; Otherwise, it will return ""
|
||||
func SupportEviction(clientset *internalclientset.Clientset) (string, error) {
|
||||
discoveryClient := clientset.Discovery()
|
||||
groupList, err := discoveryClient.ServerGroups()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
foundPolicyGroup := false
|
||||
var policyGroupVersion string
|
||||
for _, group := range groupList.Groups {
|
||||
if group.Name == "policy" {
|
||||
foundPolicyGroup = true
|
||||
policyGroupVersion = group.PreferredVersion.GroupVersion
|
||||
break
|
||||
}
|
||||
}
|
||||
if !foundPolicyGroup {
|
||||
return "", nil
|
||||
}
|
||||
resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
for _, resource := range resourceList.APIResources {
|
||||
if resource.Name == EvictionSubresource && resource.Kind == EvictionKind {
|
||||
return policyGroupVersion, nil
|
||||
}
|
||||
}
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for
|
||||
// "Unschedulable" is passed as the first arg.
|
||||
func (o *DrainOptions) RunCordonOrUncordon(desired bool) error {
|
||||
|
|
|
@ -39,6 +39,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/batch"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/apis/policy"
|
||||
"k8s.io/kubernetes/pkg/client/restclient/fake"
|
||||
"k8s.io/kubernetes/pkg/conversion"
|
||||
cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing"
|
||||
|
@ -48,6 +49,11 @@ import (
|
|||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
const (
|
||||
EvictionMethod = "Eviction"
|
||||
DeleteMethod = "Delete"
|
||||
)
|
||||
|
||||
var node *api.Node
|
||||
var cordoned_node *api.Node
|
||||
|
||||
|
@ -463,95 +469,144 @@ func TestDrain(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
new_node := &api.Node{}
|
||||
deleted := false
|
||||
f, tf, codec, ns := cmdtesting.NewAPIFactory()
|
||||
|
||||
tf.Client = &fake.RESTClient{
|
||||
NegotiatedSerializer: ns,
|
||||
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
|
||||
m := &MyReq{req}
|
||||
switch {
|
||||
case m.isFor("GET", "/nodes/node"):
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, test.node)}, nil
|
||||
case m.isFor("GET", "/namespaces/default/replicationcontrollers/rc"):
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &test.rcs[0])}, nil
|
||||
case m.isFor("GET", "/namespaces/default/daemonsets/ds"):
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &ds)}, nil
|
||||
case m.isFor("GET", "/namespaces/default/jobs/job"):
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &job)}, nil
|
||||
case m.isFor("GET", "/namespaces/default/replicasets/rs"):
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &test.replicaSets[0])}, nil
|
||||
case m.isFor("GET", "/namespaces/default/pods/bar"):
|
||||
return &http.Response{StatusCode: 404, Header: defaultHeader(), Body: objBody(codec, nil)}, nil
|
||||
case m.isFor("GET", "/pods"):
|
||||
values, err := url.ParseQuery(req.URL.RawQuery)
|
||||
if err != nil {
|
||||
t.Fatalf("%s: unexpected error: %v", test.description, err)
|
||||
}
|
||||
get_params := make(url.Values)
|
||||
get_params["fieldSelector"] = []string{"spec.nodeName=node"}
|
||||
if !reflect.DeepEqual(get_params, values) {
|
||||
t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, get_params, values)
|
||||
}
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.PodList{Items: test.pods})}, nil
|
||||
case m.isFor("GET", "/replicationcontrollers"):
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.ReplicationControllerList{Items: test.rcs})}, nil
|
||||
case m.isFor("PUT", "/nodes/node"):
|
||||
data, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("%s: unexpected error: %v", test.description, err)
|
||||
}
|
||||
defer req.Body.Close()
|
||||
if err := runtime.DecodeInto(codec, data, new_node); err != nil {
|
||||
t.Fatalf("%s: unexpected error: %v", test.description, err)
|
||||
}
|
||||
if !reflect.DeepEqual(test.expected.Spec, new_node.Spec) {
|
||||
t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, test.expected.Spec, new_node.Spec)
|
||||
}
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, new_node)}, nil
|
||||
case m.isFor("DELETE", "/namespaces/default/pods/bar"):
|
||||
deleted = true
|
||||
return &http.Response{StatusCode: 204, Header: defaultHeader(), Body: objBody(codec, &test.pods[0])}, nil
|
||||
default:
|
||||
t.Fatalf("%s: unexpected request: %v %#v\n%#v", test.description, req.Method, req.URL, req)
|
||||
return nil, nil
|
||||
}
|
||||
}),
|
||||
testEviction := false
|
||||
for i := 0; i < 2; i++ {
|
||||
testEviction = !testEviction
|
||||
var currMethod string
|
||||
if testEviction {
|
||||
currMethod = EvictionMethod
|
||||
} else {
|
||||
currMethod = DeleteMethod
|
||||
}
|
||||
tf.ClientConfig = defaultClientConfig()
|
||||
for _, test := range tests {
|
||||
new_node := &api.Node{}
|
||||
deleted := false
|
||||
evicted := false
|
||||
f, tf, codec, ns := cmdtesting.NewAPIFactory()
|
||||
tf.Client = &fake.RESTClient{
|
||||
NegotiatedSerializer: ns,
|
||||
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
|
||||
m := &MyReq{req}
|
||||
switch {
|
||||
case req.Method == "GET" && req.URL.Path == "/api":
|
||||
apiVersions := unversioned.APIVersions{
|
||||
Versions: []string{"v1"},
|
||||
}
|
||||
return genResponseWithJsonEncodedBody(apiVersions)
|
||||
case req.Method == "GET" && req.URL.Path == "/apis":
|
||||
groupList := unversioned.APIGroupList{
|
||||
Groups: []unversioned.APIGroup{
|
||||
{
|
||||
Name: "policy",
|
||||
PreferredVersion: unversioned.GroupVersionForDiscovery{
|
||||
GroupVersion: "policy/v1beta1",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return genResponseWithJsonEncodedBody(groupList)
|
||||
case req.Method == "GET" && req.URL.Path == "/api/v1":
|
||||
resourceList := unversioned.APIResourceList{
|
||||
GroupVersion: "v1",
|
||||
}
|
||||
if testEviction {
|
||||
resourceList.APIResources = []unversioned.APIResource{
|
||||
{
|
||||
Name: EvictionSubresource,
|
||||
Kind: EvictionKind,
|
||||
},
|
||||
}
|
||||
}
|
||||
return genResponseWithJsonEncodedBody(resourceList)
|
||||
case m.isFor("GET", "/nodes/node"):
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, test.node)}, nil
|
||||
case m.isFor("GET", "/namespaces/default/replicationcontrollers/rc"):
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &test.rcs[0])}, nil
|
||||
case m.isFor("GET", "/namespaces/default/daemonsets/ds"):
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &ds)}, nil
|
||||
case m.isFor("GET", "/namespaces/default/jobs/job"):
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &job)}, nil
|
||||
case m.isFor("GET", "/namespaces/default/replicasets/rs"):
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &test.replicaSets[0])}, nil
|
||||
case m.isFor("GET", "/namespaces/default/pods/bar"):
|
||||
return &http.Response{StatusCode: 404, Header: defaultHeader(), Body: objBody(codec, &api.Pod{})}, nil
|
||||
case m.isFor("GET", "/pods"):
|
||||
values, err := url.ParseQuery(req.URL.RawQuery)
|
||||
if err != nil {
|
||||
t.Fatalf("%s: unexpected error: %v", test.description, err)
|
||||
}
|
||||
get_params := make(url.Values)
|
||||
get_params["fieldSelector"] = []string{"spec.nodeName=node"}
|
||||
if !reflect.DeepEqual(get_params, values) {
|
||||
t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, get_params, values)
|
||||
}
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.PodList{Items: test.pods})}, nil
|
||||
case m.isFor("GET", "/replicationcontrollers"):
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.ReplicationControllerList{Items: test.rcs})}, nil
|
||||
case m.isFor("PUT", "/nodes/node"):
|
||||
data, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("%s: unexpected error: %v", test.description, err)
|
||||
}
|
||||
defer req.Body.Close()
|
||||
if err := runtime.DecodeInto(codec, data, new_node); err != nil {
|
||||
t.Fatalf("%s: unexpected error: %v", test.description, err)
|
||||
}
|
||||
if !reflect.DeepEqual(test.expected.Spec, new_node.Spec) {
|
||||
t.Fatalf("%s: expected:\n%v\nsaw:\n%v\n", test.description, test.expected.Spec, new_node.Spec)
|
||||
}
|
||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, new_node)}, nil
|
||||
case m.isFor("DELETE", "/namespaces/default/pods/bar"):
|
||||
deleted = true
|
||||
return &http.Response{StatusCode: 204, Header: defaultHeader(), Body: objBody(codec, &test.pods[0])}, nil
|
||||
case m.isFor("POST", "/namespaces/default/pods/bar/eviction"):
|
||||
evicted = true
|
||||
return &http.Response{StatusCode: 201, Header: defaultHeader(), Body: policyObjBody(&policy.Eviction{})}, nil
|
||||
default:
|
||||
t.Fatalf("%s: unexpected request: %v %#v\n%#v", test.description, req.Method, req.URL, req)
|
||||
return nil, nil
|
||||
}
|
||||
}),
|
||||
}
|
||||
tf.ClientConfig = defaultClientConfig()
|
||||
|
||||
buf := bytes.NewBuffer([]byte{})
|
||||
cmd := NewCmdDrain(f, buf)
|
||||
buf := bytes.NewBuffer([]byte{})
|
||||
errBuf := bytes.NewBuffer([]byte{})
|
||||
cmd := NewCmdDrain(f, buf, errBuf)
|
||||
|
||||
saw_fatal := false
|
||||
func() {
|
||||
defer func() {
|
||||
// Recover from the panic below.
|
||||
_ = recover()
|
||||
// Restore cmdutil behavior
|
||||
cmdutil.DefaultBehaviorOnFatal()
|
||||
saw_fatal := false
|
||||
func() {
|
||||
defer func() {
|
||||
// Recover from the panic below.
|
||||
_ = recover()
|
||||
// Restore cmdutil behavior
|
||||
cmdutil.DefaultBehaviorOnFatal()
|
||||
}()
|
||||
cmdutil.BehaviorOnFatal(func(e string, code int) { saw_fatal = true; panic(e) })
|
||||
cmd.SetArgs(test.args)
|
||||
cmd.Execute()
|
||||
}()
|
||||
cmdutil.BehaviorOnFatal(func(e string, code int) { saw_fatal = true; panic(e) })
|
||||
cmd.SetArgs(test.args)
|
||||
cmd.Execute()
|
||||
}()
|
||||
|
||||
if test.expectFatal {
|
||||
if !saw_fatal {
|
||||
t.Fatalf("%s: unexpected non-error", test.description)
|
||||
if test.expectFatal {
|
||||
if !saw_fatal {
|
||||
t.Fatalf("%s: unexpected non-error when using %s", test.description, currMethod)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if test.expectDelete {
|
||||
if !deleted {
|
||||
t.Fatalf("%s: pod never deleted", test.description)
|
||||
if test.expectDelete {
|
||||
// Test Delete
|
||||
if !testEviction && !deleted {
|
||||
t.Fatalf("%s: pod never deleted", test.description)
|
||||
}
|
||||
// Test Eviction
|
||||
if testEviction && !evicted {
|
||||
t.Fatalf("%s: pod never evicted", test.description)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !test.expectDelete {
|
||||
if deleted {
|
||||
t.Fatalf("%s: unexpected delete", test.description)
|
||||
if !test.expectDelete {
|
||||
if deleted {
|
||||
t.Fatalf("%s: unexpected delete when using %s", test.description, currMethod)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue