Remove kubectl reapers

pull/8/head
Maciej Szulik 2018-05-17 17:27:44 +02:00
parent d7c40cf69e
commit 383872615d
No known key found for this signature in database
GPG Key ID: F15E55D276FA84C4
43 changed files with 189 additions and 1818 deletions

View File

@ -1468,7 +1468,7 @@ __EOF__
# Test that we can list this new CustomResource
kube::test::get_object_assert foos "{{range.items}}{{$id_field}}:{{end}}" ''
# Compare "old" output with experimental output and ensure both are the same
expected_output=$(kubectl get foos "${kube_flags[@]}")
expected_output=$(kubectl get foos "${kube_flags[@]}" | awk 'NF{NF--};1')
actual_output=$(kubectl get foos --server-print=false "${kube_flags[@]}" | awk 'NF{NF--};1')
kube::test::if_has_string "${actual_output}" "${expected_output}"
@ -1480,6 +1480,9 @@ __EOF__
kubectl delete rc frontend "${kube_flags[@]}"
kubectl delete ds bind "${kube_flags[@]}"
kubectl delete pod valid-pod "${kube_flags[@]}"
set +o nounset
set +o errexit
}
run_kubectl_get_tests() {

View File

@ -12,7 +12,6 @@ go_test(
"autoscale_test.go",
"clusterrolebinding_test.go",
"configmap_test.go",
"delete_test.go",
"deployment_test.go",
"env_file_test.go",
"generate_test.go",
@ -40,12 +39,9 @@ go_test(
"//pkg/api/legacyscheme:go_default_library",
"//pkg/api/testapi:go_default_library",
"//pkg/api/testing:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/kubectl/util:go_default_library",
"//pkg/util/pointer:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",
@ -71,8 +67,6 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/rest/fake:go_default_library",
@ -92,7 +86,6 @@ go_library(
"clusterrolebinding.go",
"conditions.go",
"configmap.go",
"delete.go",
"deployment.go",
"doc.go",
"env_file.go",
@ -123,19 +116,15 @@ go_library(
"//pkg/api/pod:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/apis/apps:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/v1:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/apps/internalversion:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/batch/internalversion:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion:go_default_library",
"//pkg/controller/deployment/util:go_default_library",
"//pkg/credentialprovider:go_default_library",
"//pkg/kubectl/apps:go_default_library",
"//pkg/kubectl/cmd/scalejob:go_default_library",
"//pkg/kubectl/genericclioptions/resource:go_default_library",
"//pkg/kubectl/util:go_default_library",
"//pkg/kubectl/util/hash:go_default_library",
@ -171,7 +160,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/validation:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",

View File

@ -197,7 +197,6 @@ go_test(
"//pkg/apis/core:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/kubectl:go_default_library",
"//pkg/kubectl/cmd/create:go_default_library",
"//pkg/kubectl/cmd/testing:go_default_library",
"//pkg/kubectl/cmd/util:go_default_library",
@ -214,7 +213,6 @@ go_test(
"//vendor/github.com/googleapis/gnostic/OpenAPIv2:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/gopkg.in/yaml.v2:go_default_library",
"//vendor/k8s.io/api/autoscaling/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
@ -232,9 +230,9 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch/testing:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/version:go_default_library",
"//vendor/k8s.io/client-go/dynamic/fake:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/rest/fake:go_default_library",
"//vendor/k8s.io/client-go/scale/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/tools/remotecommand:go_default_library",
"//vendor/k8s.io/metrics/pkg/apis/metrics/v1alpha1:go_default_library",

View File

@ -43,7 +43,6 @@ import (
scaleclient "k8s.io/client-go/scale"
oapi "k8s.io/kube-openapi/pkg/util/proto"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
@ -82,7 +81,6 @@ type ApplyOptions struct {
Mapper meta.RESTMapper
Scaler scaleclient.ScalesGetter
DynamicClient dynamic.Interface
ClientSetFunc func() (internalclientset.Interface, error)
OpenAPISchema openapi.Resources
Namespace string
@ -215,7 +213,6 @@ func (o *ApplyOptions) Complete(f cmdutil.Factory, cmd *cobra.Command) error {
o.ShouldIncludeUninitialized = cmdutil.ShouldIncludeUninitialized(cmd, o.Prune)
o.OpenAPISchema, _ = f.OpenAPISchema()
o.ClientSetFunc = f.ClientSet
o.Validator, err = f.Validator(cmdutil.GetFlagBool(cmd, "validate"))
o.Builder = f.NewBuilder()
o.Mapper, err = f.ToRESTMapper()
@ -406,7 +403,6 @@ func (o *ApplyOptions) Run() error {
mapping: info.Mapping,
helper: helper,
dynamicClient: o.DynamicClient,
clientsetFunc: o.ClientSetFunc,
overwrite: o.Overwrite,
backOff: clockwork.NewRealClock(),
force: o.DeleteOptions.ForceDeletion,
@ -414,7 +410,6 @@ func (o *ApplyOptions) Run() error {
timeout: o.DeleteOptions.Timeout,
gracePeriod: o.DeleteOptions.GracePeriod,
openapiSchema: openapiSchema,
scaleClient: o.Scaler,
}
patchBytes, patchedObject, err := patcher.patch(info.Object, modified, info.Source, info.Namespace, info.Name, o.ErrOut)
@ -491,7 +486,6 @@ func (o *ApplyOptions) Run() error {
p := pruner{
mapper: o.Mapper,
dynamicClient: o.DynamicClient,
clientsetFunc: o.ClientSetFunc,
labelSelector: o.Selector,
visitedUids: visitedUids,
@ -580,7 +574,6 @@ func getRESTMappings(mapper meta.RESTMapper, pruneResources *[]pruneResource) (n
type pruner struct {
mapper meta.RESTMapper
dynamicClient dynamic.Interface
clientsetFunc func() (internalclientset.Interface, error)
visitedUids sets.String
labelSelector string
@ -630,7 +623,7 @@ func (p *pruner) prune(namespace string, mapping *meta.RESTMapping, includeUnini
}
name := metadata.GetName()
if !p.dryRun {
if err := p.delete(namespace, name, mapping, p.scaler); err != nil {
if err := p.delete(namespace, name, mapping); err != nil {
return err
}
}
@ -644,44 +637,31 @@ func (p *pruner) prune(namespace string, mapping *meta.RESTMapping, includeUnini
return nil
}
func (p *pruner) delete(namespace, name string, mapping *meta.RESTMapping, scaleClient scaleclient.ScalesGetter) error {
return runDelete(namespace, name, mapping, p.dynamicClient, p.cascade, p.gracePeriod, p.clientsetFunc, scaleClient)
func (p *pruner) delete(namespace, name string, mapping *meta.RESTMapping) error {
return runDelete(namespace, name, mapping, p.dynamicClient, p.cascade, p.gracePeriod)
}
func runDelete(namespace, name string, mapping *meta.RESTMapping, c dynamic.Interface, cascade bool, gracePeriod int, clientsetFunc func() (internalclientset.Interface, error), scaleClient scaleclient.ScalesGetter) error {
if !cascade {
return c.Resource(mapping.Resource).Namespace(namespace).Delete(name, nil)
}
cs, err := clientsetFunc()
if err != nil {
return err
}
r, err := kubectl.ReaperFor(mapping.GroupVersionKind.GroupKind(), cs, scaleClient)
if err != nil {
if _, ok := err.(*kubectl.NoSuchReaperError); !ok {
return err
}
return c.Resource(mapping.Resource).Namespace(namespace).Delete(name, nil)
}
var options *metav1.DeleteOptions
func runDelete(namespace, name string, mapping *meta.RESTMapping, c dynamic.Interface, cascade bool, gracePeriod int) error {
options := &metav1.DeleteOptions{}
if gracePeriod >= 0 {
options = metav1.NewDeleteOptions(int64(gracePeriod))
}
if err := r.Stop(namespace, name, 2*time.Minute, options); err != nil {
return err
policy := metav1.DeletePropagationForeground
if !cascade {
policy = metav1.DeletePropagationOrphan
}
return nil
options.PropagationPolicy = &policy
return c.Resource(mapping.Resource).Namespace(namespace).Delete(name, options)
}
func (p *patcher) delete(namespace, name string) error {
return runDelete(namespace, name, p.mapping, p.dynamicClient, p.cascade, p.gracePeriod, p.clientsetFunc, p.scaleClient)
return runDelete(namespace, name, p.mapping, p.dynamicClient, p.cascade, p.gracePeriod)
}
type patcher struct {
mapping *meta.RESTMapping
helper *resource.Helper
dynamicClient dynamic.Interface
clientsetFunc func() (internalclientset.Interface, error)
overwrite bool
backOff clockwork.Clock
@ -692,7 +672,6 @@ type patcher struct {
gracePeriod int
openapiSchema openapi.Resources
scaleClient scaleclient.ScalesGetter
}
func (p *patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
@ -790,17 +769,16 @@ func (p *patcher) patch(current runtime.Object, modified []byte, source, namespa
}
func (p *patcher) deleteAndCreate(original runtime.Object, modified []byte, namespace, name string) ([]byte, runtime.Object, error) {
err := p.delete(namespace, name)
if err != nil {
if err := p.delete(namespace, name); err != nil {
return modified, nil, err
}
err = wait.PollImmediate(kubectl.Interval, p.timeout, func() (bool, error) {
// TODO: use wait
if err := wait.PollImmediate(1*time.Second, p.timeout, func() (bool, error) {
if _, err := p.helper.Get(namespace, name, false); !errors.IsNotFound(err) {
return false, err
}
return true, nil
})
if err != nil {
}); err != nil {
return modified, nil, err
}
versionedObject, _, err := unstructured.UnstructuredJSONScheme.Decode(modified, nil, nil)

View File

@ -31,19 +31,17 @@ import (
"github.com/spf13/cobra"
autoscalingv1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
kubeerr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
sptest "k8s.io/apimachinery/pkg/util/strategicpatch/testing"
dynamicfakeclient "k8s.io/client-go/dynamic/fake"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/rest/fake"
fakescale "k8s.io/client-go/scale/fake"
testcore "k8s.io/client-go/testing"
clienttesting "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/api/testapi"
api "k8s.io/kubernetes/pkg/apis/core"
@ -1211,18 +1209,18 @@ func checkPatchString(t *testing.T, req *http.Request) {
func TestForceApply(t *testing.T) {
initTestErrorHandler(t)
scheme := runtime.NewScheme()
nameRC, currentRC := readAndAnnotateReplicationController(t, filenameRC)
pathRC := "/namespaces/test/replicationcontrollers/" + nameRC
pathRCList := "/namespaces/test/replicationcontrollers"
expected := map[string]int{
"getOk": 7,
"getOk": 6,
"getNotFound": 1,
"getList": 1,
"getList": 0,
"patch": 6,
"delete": 1,
"post": 1,
}
scaleClientExpected := []string{"get", "update", "get", "get"}
for _, fn := range testingOpenAPISchemaFns {
t.Run("test apply with --force", func(t *testing.T) {
@ -1282,10 +1280,6 @@ func TestForceApply(t *testing.T) {
}
t.Fatalf("unexpected request: %#v after %v tries\n%#v", req.URL, counts["patch"], req)
return nil, nil
case strings.HasSuffix(p, pathRC) && m == "DELETE":
counts["delete"]++
deleted = true
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte{}))}, nil
case strings.HasSuffix(p, pathRC) && m == "PUT":
counts["put"]++
bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC))
@ -1303,43 +1297,18 @@ func TestForceApply(t *testing.T) {
}
}),
}
newReplicas := int32(3)
scaleClient := &fakescale.FakeScaleClient{}
scaleClient.AddReactor("get", "replicationcontrollers", func(rawAction testcore.Action) (handled bool, ret runtime.Object, err error) {
action := rawAction.(testcore.GetAction)
if action.GetName() != "test-rc" {
return true, nil, fmt.Errorf("expected = test-rc, got = %s", action.GetName())
fakeDynamicClient := dynamicfakeclient.NewSimpleDynamicClient(scheme)
fakeDynamicClient.PrependReactor("delete", "replicationcontrollers", func(action clienttesting.Action) (bool, runtime.Object, error) {
if deleteAction, ok := action.(clienttesting.DeleteAction); ok {
if deleteAction.GetName() == nameRC {
counts["delete"]++
deleted = true
return true, nil, nil
}
}
obj := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: action.GetName(),
Namespace: action.GetNamespace(),
},
Spec: autoscalingv1.ScaleSpec{
Replicas: newReplicas,
},
}
return true, obj, nil
return false, nil, nil
})
scaleClient.AddReactor("update", "replicationcontrollers", func(rawAction testcore.Action) (handled bool, ret runtime.Object, err error) {
action := rawAction.(testcore.UpdateAction)
obj := action.GetObject().(*autoscalingv1.Scale)
if obj.Name != "test-rc" {
return true, nil, fmt.Errorf("expected = test-rc, got = %s", obj.Name)
}
newReplicas = obj.Spec.Replicas
return true, &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: obj.Name,
Namespace: action.GetNamespace(),
},
Spec: autoscalingv1.ScaleSpec{
Replicas: newReplicas,
},
}, nil
})
tf.ScaleGetter = scaleClient
tf.FakeDynamicClient = fakeDynamicClient
tf.OpenAPISchemaFunc = fn
tf.Client = tf.UnstructuredClient
tf.ClientConfigVal = &restclient.Config{}
@ -1364,22 +1333,6 @@ func TestForceApply(t *testing.T) {
if errBuf.String() != "" {
t.Fatalf("unexpected error output: %s", errBuf.String())
}
scale, err := scaleClient.Scales(tf.Namespace).Get(schema.GroupResource{Group: "", Resource: "replicationcontrollers"}, nameRC)
if err != nil {
t.Error(err)
}
if scale.Spec.Replicas != 0 {
t.Errorf("a scale subresource has unexpected number of replicas, got %d expected 0", scale.Spec.Replicas)
}
if len(scaleClient.Actions()) != len(scaleClientExpected) {
t.Fatalf("a fake scale client has unexpected amout of API calls, wanted = %d, got = %d", len(scaleClientExpected), len(scaleClient.Actions()))
}
for index, action := range scaleClient.Actions() {
if scaleClientExpected[index] != action.GetVerb() {
t.Errorf("unexpected API method called on a fake scale client, wanted = %s, got = %s at index = %d", scaleClientExpected[index], action.GetVerb(), index)
}
}
})
}
}

View File

@ -27,9 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
kubectlwait "k8s.io/kubernetes/pkg/kubectl/cmd/wait"
@ -103,8 +101,6 @@ type DeleteOptions struct {
ForceDeletion bool
WaitForDeletion bool
Reaper func(mapping *meta.RESTMapping) (kubectl.Reaper, error)
GracePeriod int
Timeout time.Duration
@ -128,15 +124,9 @@ func NewCmdDelete(f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra
Example: delete_example,
Run: func(cmd *cobra.Command, args []string) {
o := deleteFlags.ToOptions(nil, streams)
if err := o.Complete(f, args, cmd); err != nil {
cmdutil.CheckErr(err)
}
if err := o.Validate(cmd); err != nil {
cmdutil.CheckErr(cmdutil.UsageErrorf(cmd, err.Error()))
}
if err := o.RunDelete(); err != nil {
cmdutil.CheckErr(err)
}
cmdutil.CheckErr(o.Complete(f, args, cmd))
cmdutil.CheckErr(o.Validate(cmd))
cmdutil.CheckErr(o.RunDelete())
},
SuggestFor: []string{"rm"},
}
@ -178,8 +168,6 @@ func (o *DeleteOptions) Complete(f cmdutil.Factory, args []string, cmd *cobra.Co
o.WaitForDeletion = b
}
o.Reaper = f.Reaper
includeUninitialized := cmdutil.ShouldIncludeUninitialized(cmd, false)
r := f.NewBuilder().
Unstructured().
@ -234,62 +222,9 @@ func (o *DeleteOptions) Validate(cmd *cobra.Command) error {
}
func (o *DeleteOptions) RunDelete() error {
// By default use a reaper to delete all related resources.
if o.Cascade {
// TODO(juanvallejo): although o.Result can be accessed from the options
// it is also passed here so that callers of this method outside of the "delete"
// command do not have to tack it to the "delete" options as well.
// Find a cleaner way to approach this.
return o.ReapResult(o.Result, true, false)
}
return o.DeleteResult(o.Result)
}
func (o *DeleteOptions) ReapResult(r *resource.Result, isDefaultDelete, quiet bool) error {
found := 0
if o.IgnoreNotFound {
r = r.IgnoreErrors(errors.IsNotFound)
}
err := r.Visit(func(info *resource.Info, err error) error {
if err != nil {
return err
}
found++
reaper, err := o.Reaper(info.Mapping)
if err != nil {
// If there is no reaper for this resources and the user didn't explicitly ask for stop.
if kubectl.IsNoSuchReaperError(err) && isDefaultDelete {
// No client side reaper found. Let the server do cascading deletion.
return o.cascadingDeleteResource(info)
}
return cmdutil.AddSourceToErr("reaping", info.Source, err)
}
var options *metav1.DeleteOptions
if o.GracePeriod >= 0 {
options = metav1.NewDeleteOptions(int64(o.GracePeriod))
}
if err := reaper.Stop(info.Namespace, info.Name, o.Timeout, options); err != nil {
return cmdutil.AddSourceToErr("stopping", info.Source, err)
}
if o.WaitForDeletion {
if err := waitForObjectDeletion(info, o.Timeout); err != nil {
return cmdutil.AddSourceToErr("stopping", info.Source, err)
}
}
if !quiet {
o.PrintObj(info)
}
return nil
})
if err != nil {
return err
}
if found == 0 {
fmt.Fprintf(o.Out, "No resources found\n")
}
return nil
}
func (o *DeleteOptions) DeleteResult(r *resource.Result) error {
found := 0
if o.IgnoreNotFound {
@ -301,12 +236,14 @@ func (o *DeleteOptions) DeleteResult(r *resource.Result) error {
}
found++
// if we're here, it means that cascade=false (not the default), so we should orphan as requested
options := &metav1.DeleteOptions{}
if o.GracePeriod >= 0 {
options = metav1.NewDeleteOptions(int64(o.GracePeriod))
}
policy := metav1.DeletePropagationOrphan
policy := metav1.DeletePropagationForeground
if !o.Cascade {
policy = metav1.DeletePropagationOrphan
}
options.PropagationPolicy = &policy
return o.deleteResource(info, options)
})
@ -349,11 +286,6 @@ func (o *DeleteOptions) DeleteResult(r *resource.Result) error {
return err
}
func (o *DeleteOptions) cascadingDeleteResource(info *resource.Info) error {
policy := metav1.DeletePropagationForeground
return o.deleteResource(info, &metav1.DeleteOptions{PropagationPolicy: &policy})
}
func (o *DeleteOptions) deleteResource(info *resource.Info, deleteOptions *metav1.DeleteOptions) error {
if err := resource.NewHelper(info.Client, info.Mapping).DeleteWithOptions(info.Namespace, info.Name, deleteOptions); err != nil {
return cmdutil.AddSourceToErr("deleting", info.Source, err)
@ -386,24 +318,3 @@ func (o *DeleteOptions) PrintObj(info *resource.Info) {
// understandable output by default
fmt.Fprintf(o.Out, "%s \"%s\" %s\n", kindString, info.Name, operation)
}
// objectDeletionWaitInterval is the interval to wait between checks for deletion.
var objectDeletionWaitInterval = time.Second
// waitForObjectDeletion refreshes the object, waiting until it is deleted, a timeout is reached, or
// an error is encountered. It checks once a second.
func waitForObjectDeletion(info *resource.Info, timeout time.Duration) error {
copied := *info
info = &copied
// TODO: refactor Reaper so that we can pass the "wait" option into it, and then check for UID change.
return wait.PollImmediate(objectDeletionWaitInterval, timeout, func() (bool, error) {
switch err := info.Get(); {
case err == nil:
return false, nil
case errors.IsNotFound(err):
return true, nil
default:
return false, err
}
})
}

View File

@ -23,19 +23,15 @@ import (
"net/http"
"strings"
"testing"
"time"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest/fake"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubectl"
cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/genericclioptions"
"k8s.io/kubernetes/pkg/kubectl/genericclioptions/resource"
"k8s.io/kubernetes/pkg/kubectl/scheme"
@ -259,34 +255,10 @@ func TestDeleteObject(t *testing.T) {
}
}
type fakeReaper struct {
namespace, name string
timeout time.Duration
deleteOptions *metav1.DeleteOptions
err error
}
func (r *fakeReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
r.namespace, r.name = namespace, name
r.timeout = timeout
r.deleteOptions = gracePeriod
return r.err
}
type fakeReaperFactory struct {
cmdutil.Factory
reaper kubectl.Reaper
}
func (f *fakeReaperFactory) Reaper(mapping *meta.RESTMapping) (kubectl.Reaper, error) {
return f.reaper, nil
}
func TestDeleteObjectGraceZero(t *testing.T) {
initTestErrorHandler(t)
pods, _, _ := testData()
objectDeletionWaitInterval = time.Millisecond
count := 0
tf := cmdtesting.NewTestFactory()
defer tf.Cleanup()
@ -318,10 +290,8 @@ func TestDeleteObjectGraceZero(t *testing.T) {
}
tf.Namespace = "test"
reaper := &fakeReaper{}
fake := &fakeReaperFactory{Factory: tf, reaper: reaper}
streams, _, buf, errBuf := genericclioptions.NewTestIOStreams()
cmd := NewCmdDelete(fake, streams)
cmd := NewCmdDelete(tf, streams)
cmd.Flags().Set("output", "name")
cmd.Flags().Set("grace-period", "0")
cmd.Run(cmd, []string{"pods/nginx"})
@ -330,10 +300,7 @@ func TestDeleteObjectGraceZero(t *testing.T) {
if buf.String() != "pod/nginx\n" {
t.Errorf("unexpected output: %s\n---\n%s", buf.String(), errBuf.String())
}
if reaper.deleteOptions == nil || reaper.deleteOptions.GracePeriodSeconds == nil || *reaper.deleteOptions.GracePeriodSeconds != 1 {
t.Errorf("unexpected reaper options: %#v", reaper)
}
if count != 4 {
if count != 0 {
t.Errorf("unexpected calls to GET: %d", count)
}
}

View File

@ -43,7 +43,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/genericclioptions"
@ -594,7 +593,7 @@ func (o *DrainOptions) evictPods(pods []corev1.Pod, policyGroupVersion string, g
}
}
podArray := []corev1.Pod{pod}
_, err = o.waitForDelete(podArray, kubectl.Interval, time.Duration(math.MaxInt64), true, getPodFn)
_, err = o.waitForDelete(podArray, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn)
if err == nil {
doneCh <- true
} else {
@ -640,7 +639,7 @@ func (o *DrainOptions) deletePods(pods []corev1.Pod, getPodFn func(namespace, na
return err
}
}
_, err := o.waitForDelete(pods, kubectl.Interval, globalTimeout, false, getPodFn)
_, err := o.waitForDelete(pods, 1*time.Second, globalTimeout, false, getPodFn)
return err
}

View File

@ -21,6 +21,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/spf13/cobra"
@ -158,7 +159,6 @@ func (o *ReplaceOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []
//Replace will create a resource if it doesn't exist already, so ignore not found error
deleteOpts.IgnoreNotFound = true
deleteOpts.Reaper = f.Reaper
if o.PrintFlags.OutputFormat != nil {
deleteOpts.Output = *o.PrintFlags.OutputFormat
}
@ -273,25 +273,20 @@ func (o *ReplaceOptions) forceReplace() error {
return err
}
var err error
// By default use a reaper to delete all related resources.
if o.DeleteOptions.Cascade {
glog.Warningf("\"cascade\" is set, kubectl will delete and re-create all resources managed by this resource (e.g. Pods created by a ReplicationController). Consider using \"kubectl rolling-update\" if you want to update a ReplicationController together with its Pods.")
err = o.DeleteOptions.ReapResult(r, o.DeleteOptions.Cascade, false)
} else {
err = o.DeleteOptions.DeleteResult(r)
if err := o.DeleteOptions.DeleteResult(r); err != nil {
return err
}
timeout := o.DeleteOptions.Timeout
if timeout == 0 {
timeout = kubectl.Timeout
timeout = 5 * time.Minute
}
err = r.Visit(func(info *resource.Info, err error) error {
err := r.Visit(func(info *resource.Info, err error) error {
if err != nil {
return err
}
return wait.PollImmediate(kubectl.Interval, timeout, func() (bool, error) {
return wait.PollImmediate(1*time.Second, timeout, func() (bool, error) {
if err := info.Get(); !errors.IsNotFound(err) {
return false, err
}

View File

@ -227,7 +227,6 @@ func (o *RunOptions) Complete(f cmdutil.Factory, cmd *cobra.Command) error {
deleteOpts.IgnoreNotFound = true
deleteOpts.WaitForDeletion = false
deleteOpts.GracePeriod = -1
deleteOpts.Reaper = f.Reaper
o.DeleteOptions = deleteOpts
@ -459,14 +458,7 @@ func (o *RunOptions) removeCreatedObjects(f cmdutil.Factory, createdObjects []*R
ResourceNames(obj.Mapping.Resource.Resource+"."+obj.Mapping.Resource.Group, name).
Flatten().
Do()
// Note: we pass in "true" for the "quiet" parameter because
// ReadResult will only print one thing based on the "quiet"
// flag, and that's the "pod xxx deleted" message. If they
// asked for us to remove the pod (via --rm) then telling them
// its been deleted is unnecessary since that's what they asked
// for. We should only print something if the "rm" fails.
err = o.DeleteOptions.ReapResult(r, true, true)
if err != nil {
if err := o.DeleteOptions.DeleteResult(r); err != nil {
return err
}
}

View File

@ -214,11 +214,11 @@ func (o *ScaleOptions) RunScale() error {
}
precondition := &kubectl.ScalePrecondition{Size: o.CurrentReplicas, ResourceVersion: o.ResourceVersion}
retry := kubectl.NewRetryParams(kubectl.Interval, kubectl.Timeout)
retry := kubectl.NewRetryParams(1*time.Second, 5*time.Minute)
var waitForReplicas *kubectl.RetryParams
if o.Timeout != 0 {
waitForReplicas = kubectl.NewRetryParams(kubectl.Interval, timeout)
waitForReplicas = kubectl.NewRetryParams(1*time.Second, timeout)
}
counter := 0

View File

@ -105,8 +105,6 @@ type ObjectMappingFactory interface {
type BuilderFactory interface {
// ScaleClient gives you back scale getter
ScaleClient() (scaleclient.ScalesGetter, error)
// Returns a Reaper for gracefully shutting down resources.
Reaper(mapping *meta.RESTMapping) (kubectl.Reaper, error)
}
type factory struct {

View File

@ -19,10 +19,8 @@ limitations under the License.
package util
import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/dynamic"
scaleclient "k8s.io/client-go/scale"
"k8s.io/kubernetes/pkg/kubectl"
)
type ring2Factory struct {
@ -56,20 +54,3 @@ func (f *ring2Factory) ScaleClient() (scaleclient.ScalesGetter, error) {
return scaleclient.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver), nil
}
func (f *ring2Factory) Reaper(mapping *meta.RESTMapping) (kubectl.Reaper, error) {
clientset, clientsetErr := f.clientAccessFactory.ClientSet()
if clientsetErr != nil {
return nil, clientsetErr
}
scaler, err := f.ScaleClient()
if err != nil {
return nil, err
}
reaper, reaperErr := kubectl.ReaperFor(mapping.GroupVersionKind.GroupKind(), clientset, scaler)
if kubectl.IsNoSuchReaperError(reaperErr) {
return nil, reaperErr
}
return reaper, reaperErr
}

View File

@ -1,504 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubectl
import (
"fmt"
"strings"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
scaleclient "k8s.io/client-go/scale"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
appsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/apps/internalversion"
batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/kubectl/cmd/scalejob"
)
const (
Interval = time.Second * 1
Timeout = time.Minute * 5
)
// A Reaper terminates an object as gracefully as possible.
type Reaper interface {
// Stop a given object within a namespace. timeout is how long we'll
// wait for the termination to be successful. gracePeriod is time given
// to an API object for it to delete itself cleanly (e.g., pod
// shutdown). It may or may not be supported by the API object.
Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error
}
type NoSuchReaperError struct {
kind schema.GroupKind
}
func (n *NoSuchReaperError) Error() string {
return fmt.Sprintf("no reaper has been implemented for %v", n.kind)
}
func IsNoSuchReaperError(err error) bool {
_, ok := err.(*NoSuchReaperError)
return ok
}
func ReaperFor(kind schema.GroupKind, c internalclientset.Interface, sc scaleclient.ScalesGetter) (Reaper, error) {
switch kind {
case api.Kind("ReplicationController"):
return &ReplicationControllerReaper{c.Core(), Interval, Timeout, sc}, nil
case extensions.Kind("ReplicaSet"), apps.Kind("ReplicaSet"):
return &ReplicaSetReaper{c.Extensions(), Interval, Timeout, sc, schema.GroupResource{Group: kind.Group, Resource: "replicasets"}}, nil
case extensions.Kind("DaemonSet"), apps.Kind("DaemonSet"):
return &DaemonSetReaper{c.Extensions(), Interval, Timeout}, nil
case api.Kind("Pod"):
return &PodReaper{c.Core()}, nil
case batch.Kind("Job"):
return &JobReaper{c.Batch(), c.Core(), Interval, Timeout}, nil
case apps.Kind("StatefulSet"):
return &StatefulSetReaper{c.Apps(), c.Core(), Interval, Timeout, sc}, nil
case extensions.Kind("Deployment"), apps.Kind("Deployment"):
return &DeploymentReaper{c.Extensions(), c.Extensions(), Interval, Timeout, sc, schema.GroupResource{Group: kind.Group, Resource: "deployments"}}, nil
}
return nil, &NoSuchReaperError{kind}
}
func ReaperForReplicationController(rcClient coreclient.ReplicationControllersGetter, scaleClient scaleclient.ScalesGetter, timeout time.Duration) (Reaper, error) {
return &ReplicationControllerReaper{rcClient, Interval, timeout, scaleClient}, nil
}
type ReplicationControllerReaper struct {
client coreclient.ReplicationControllersGetter
pollInterval, timeout time.Duration
scaleClient scaleclient.ScalesGetter
}
type ReplicaSetReaper struct {
client extensionsclient.ReplicaSetsGetter
pollInterval, timeout time.Duration
scaleClient scaleclient.ScalesGetter
gr schema.GroupResource
}
type DaemonSetReaper struct {
client extensionsclient.DaemonSetsGetter
pollInterval, timeout time.Duration
}
type JobReaper struct {
client batchclient.JobsGetter
podClient coreclient.PodsGetter
pollInterval, timeout time.Duration
}
type DeploymentReaper struct {
dClient extensionsclient.DeploymentsGetter
rsClient extensionsclient.ReplicaSetsGetter
pollInterval, timeout time.Duration
scaleClient scaleclient.ScalesGetter
gr schema.GroupResource
}
type PodReaper struct {
client coreclient.PodsGetter
}
type StatefulSetReaper struct {
client appsclient.StatefulSetsGetter
podClient coreclient.PodsGetter
pollInterval, timeout time.Duration
scaleClient scaleclient.ScalesGetter
}
// getOverlappingControllers finds rcs that this controller overlaps, as well as rcs overlapping this controller.
func getOverlappingControllers(rcClient coreclient.ReplicationControllerInterface, rc *api.ReplicationController) ([]api.ReplicationController, error) {
rcs, err := rcClient.List(metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("error getting replication controllers: %v", err)
}
var matchingRCs []api.ReplicationController
rcLabels := labels.Set(rc.Spec.Selector)
for _, controller := range rcs.Items {
newRCLabels := labels.Set(controller.Spec.Selector)
if labels.SelectorFromSet(newRCLabels).Matches(rcLabels) || labels.SelectorFromSet(rcLabels).Matches(newRCLabels) {
matchingRCs = append(matchingRCs, controller)
}
}
return matchingRCs, nil
}
func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
rc := reaper.client.ReplicationControllers(namespace)
scaler := NewScaler(reaper.scaleClient)
ctrl, err := rc.Get(name, metav1.GetOptions{})
if err != nil {
return err
}
if timeout == 0 {
timeout = Timeout + time.Duration(10*ctrl.Spec.Replicas)*time.Second
}
// The rc manager will try and detect all matching rcs for a pod's labels,
// and only sync the oldest one. This means if we have a pod with labels
// [(k1: v1), (k2: v2)] and two rcs: rc1 with selector [(k1=v1)], and rc2 with selector [(k1=v1),(k2=v2)],
// the rc manager will sync the older of the two rcs.
//
// If there are rcs with a superset of labels, eg:
// deleting: (k1=v1), superset: (k2=v2, k1=v1)
// - It isn't safe to delete the rc because there could be a pod with labels
// (k1=v1) that isn't managed by the superset rc. We can't scale it down
// either, because there could be a pod (k2=v2, k1=v1) that it deletes
// causing a fight with the superset rc.
// If there are rcs with a subset of labels, eg:
// deleting: (k2=v2, k1=v1), subset: (k1=v1), superset: (k2=v2, k1=v1, k3=v3)
// - Even if it's safe to delete this rc without a scale down because all it's pods
// are being controlled by the subset rc the code returns an error.
// In theory, creating overlapping controllers is user error, so the loop below
// tries to account for this logic only in the common case, where we end up
// with multiple rcs that have an exact match on selectors.
overlappingCtrls, err := getOverlappingControllers(rc, ctrl)
if err != nil {
return fmt.Errorf("error getting replication controllers: %v", err)
}
exactMatchRCs := []api.ReplicationController{}
overlapRCs := []string{}
for _, overlappingRC := range overlappingCtrls {
if len(overlappingRC.Spec.Selector) == len(ctrl.Spec.Selector) {
exactMatchRCs = append(exactMatchRCs, overlappingRC)
} else {
overlapRCs = append(overlapRCs, overlappingRC.Name)
}
}
if len(overlapRCs) > 0 {
return fmt.Errorf(
"Detected overlapping controllers for rc %v: %v, please manage deletion individually with --cascade=false.",
ctrl.Name, strings.Join(overlapRCs, ","))
}
if len(exactMatchRCs) == 1 {
// No overlapping controllers.
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
waitForReplicas := NewRetryParams(reaper.pollInterval, timeout)
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas, schema.GroupResource{Resource: "replicationcontrollers"}); err != nil && !errors.IsNotFound(err) {
return err
}
}
// Using a background deletion policy because the replication controller
// has already been scaled down.
policy := metav1.DeletePropagationBackground
deleteOptions := &metav1.DeleteOptions{PropagationPolicy: &policy}
return rc.Delete(name, deleteOptions)
}
// TODO(madhusudancs): Implement it when controllerRef is implemented - https://github.com/kubernetes/kubernetes/issues/2210
// getOverlappingReplicaSets finds ReplicaSets that this ReplicaSet overlaps, as well as ReplicaSets overlapping this ReplicaSet.
func getOverlappingReplicaSets(c extensionsclient.ReplicaSetInterface, rs *extensions.ReplicaSet) ([]extensions.ReplicaSet, []extensions.ReplicaSet, error) {
var overlappingRSs, exactMatchRSs []extensions.ReplicaSet
return overlappingRSs, exactMatchRSs, nil
}
func (reaper *ReplicaSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
rsc := reaper.client.ReplicaSets(namespace)
scaler := NewScaler(reaper.scaleClient)
rs, err := rsc.Get(name, metav1.GetOptions{})
if err != nil {
return err
}
if timeout == 0 {
timeout = Timeout + time.Duration(10*rs.Spec.Replicas)*time.Second
}
// The ReplicaSet controller will try and detect all matching ReplicaSets
// for a pod's labels, and only sync the oldest one. This means if we have
// a pod with labels [(k1: v1), (k2: v2)] and two ReplicaSets: rs1 with
// selector [(k1=v1)], and rs2 with selector [(k1=v1),(k2=v2)], the
// ReplicaSet controller will sync the older of the two ReplicaSets.
//
// If there are ReplicaSets with a superset of labels, eg:
// deleting: (k1=v1), superset: (k2=v2, k1=v1)
// - It isn't safe to delete the ReplicaSet because there could be a pod
// with labels (k1=v1) that isn't managed by the superset ReplicaSet.
// We can't scale it down either, because there could be a pod
// (k2=v2, k1=v1) that it deletes causing a fight with the superset
// ReplicaSet.
// If there are ReplicaSets with a subset of labels, eg:
// deleting: (k2=v2, k1=v1), subset: (k1=v1), superset: (k2=v2, k1=v1, k3=v3)
// - Even if it's safe to delete this ReplicaSet without a scale down because
// all it's pods are being controlled by the subset ReplicaSet the code
// returns an error.
// In theory, creating overlapping ReplicaSets is user error, so the loop below
// tries to account for this logic only in the common case, where we end up
// with multiple ReplicaSets that have an exact match on selectors.
// TODO(madhusudancs): Re-evaluate again when controllerRef is implemented -
// https://github.com/kubernetes/kubernetes/issues/2210
overlappingRSs, exactMatchRSs, err := getOverlappingReplicaSets(rsc, rs)
if err != nil {
return fmt.Errorf("error getting ReplicaSets: %v", err)
}
if len(overlappingRSs) > 0 {
var names []string
for _, overlappingRS := range overlappingRSs {
names = append(names, overlappingRS.Name)
}
return fmt.Errorf(
"Detected overlapping ReplicaSets for ReplicaSet %v: %v, please manage deletion individually with --cascade=false.",
rs.Name, strings.Join(names, ","))
}
if len(exactMatchRSs) == 0 {
// No overlapping ReplicaSets.
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
waitForReplicas := NewRetryParams(reaper.pollInterval, timeout)
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas, reaper.gr); err != nil && !errors.IsNotFound(err) {
return err
}
}
// Using a background deletion policy because the replica set has already
// been scaled down.
policy := metav1.DeletePropagationBackground
deleteOptions := &metav1.DeleteOptions{PropagationPolicy: &policy}
return rsc.Delete(name, deleteOptions)
}
func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
ds, err := reaper.client.DaemonSets(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return err
}
// We set the nodeSelector to a random label. This label is nearly guaranteed
// to not be set on any node so the DameonSetController will start deleting
// daemon pods. Once it's done deleting the daemon pods, it's safe to delete
// the DaemonSet.
ds.Spec.Template.Spec.NodeSelector = map[string]string{
string(uuid.NewUUID()): string(uuid.NewUUID()),
}
// force update to avoid version conflict
ds.ResourceVersion = ""
if ds, err = reaper.client.DaemonSets(namespace).Update(ds); err != nil {
return err
}
// Wait for the daemon set controller to kill all the daemon pods.
if err := wait.Poll(reaper.pollInterval, reaper.timeout, func() (bool, error) {
updatedDS, err := reaper.client.DaemonSets(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return false, nil
}
return updatedDS.Status.CurrentNumberScheduled+updatedDS.Status.NumberMisscheduled == 0, nil
}); err != nil {
return err
}
// Using a background deletion policy because the daemon set has already
// been scaled down.
policy := metav1.DeletePropagationBackground
deleteOptions := &metav1.DeleteOptions{PropagationPolicy: &policy}
return reaper.client.DaemonSets(namespace).Delete(name, deleteOptions)
}
func (reaper *StatefulSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
statefulsets := reaper.client.StatefulSets(namespace)
scaler := NewScaler(reaper.scaleClient)
ss, err := statefulsets.Get(name, metav1.GetOptions{})
if err != nil {
return err
}
if timeout == 0 {
numReplicas := ss.Spec.Replicas
// See discussion of this behavior here:
// https://github.com/kubernetes/kubernetes/pull/46468#discussion_r118589512
timeout = Timeout + time.Duration(10*numReplicas)*time.Second
}
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
waitForStatefulSet := NewRetryParams(reaper.pollInterval, timeout)
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForStatefulSet, apps.Resource("statefulsets")); err != nil && !errors.IsNotFound(err) {
return err
}
// TODO: Cleanup volumes? We don't want to accidentally delete volumes from
// stop, so just leave this up to the statefulset.
// Using a background deletion policy because the stateful set has already
// been scaled down.
policy := metav1.DeletePropagationBackground
deleteOptions := &metav1.DeleteOptions{PropagationPolicy: &policy}
return statefulsets.Delete(name, deleteOptions)
}
func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
jobs := reaper.client.Jobs(namespace)
pods := reaper.podClient.Pods(namespace)
scaler := &scalejob.JobPsuedoScaler{
JobsClient: reaper.client,
}
job, err := jobs.Get(name, metav1.GetOptions{})
if err != nil {
return err
}
if timeout == 0 {
// we will never have more active pods than job.Spec.Parallelism
parallelism := *job.Spec.Parallelism
timeout = Timeout + time.Duration(10*parallelism)*time.Second
}
// TODO: handle overlapping jobs
retry := &scalejob.RetryParams{Interval: reaper.pollInterval, Timeout: reaper.timeout}
waitForJobs := &scalejob.RetryParams{Interval: reaper.pollInterval, Timeout: reaper.timeout}
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForJobs); err != nil && !errors.IsNotFound(err) {
return err
}
// at this point only dead pods are left, that should be removed
selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector)
options := metav1.ListOptions{LabelSelector: selector.String()}
podList, err := pods.List(options)
if err != nil {
return err
}
errList := []error{}
for _, pod := range podList.Items {
if err := pods.Delete(pod.Name, gracePeriod); err != nil {
// ignores the error when the pod isn't found
if !errors.IsNotFound(err) {
errList = append(errList, err)
}
}
}
if len(errList) > 0 {
return utilerrors.NewAggregate(errList)
}
// once we have all the pods removed we can safely remove the job itself.
policy := metav1.DeletePropagationBackground
deleteOptions := &metav1.DeleteOptions{PropagationPolicy: &policy}
return jobs.Delete(name, deleteOptions)
}
func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
deployments := reaper.dClient.Deployments(namespace)
rsReaper := &ReplicaSetReaper{reaper.rsClient, reaper.pollInterval, reaper.timeout, reaper.scaleClient, schema.GroupResource{Group: reaper.gr.Group, Resource: "replicasets"}}
deployment, err := reaper.updateDeploymentWithRetries(namespace, name, func(d *extensions.Deployment) {
// set deployment's history and scale to 0
// TODO replace with patch when available: https://github.com/kubernetes/kubernetes/issues/20527
rhl := int32(0)
d.Spec.RevisionHistoryLimit = &rhl
d.Spec.Replicas = 0
d.Spec.Paused = true
})
if err != nil {
return err
}
if deployment.Initializers != nil {
policy := metav1.DeletePropagationBackground
deleteOptions := &metav1.DeleteOptions{PropagationPolicy: &policy}
return deployments.Delete(name, deleteOptions)
}
// Use observedGeneration to determine if the deployment controller noticed the pause.
if err := deploymentutil.WaitForObservedDeploymentInternal(func() (*extensions.Deployment, error) {
return deployments.Get(name, metav1.GetOptions{})
}, deployment.Generation, 1*time.Second, 1*time.Minute); err != nil {
return err
}
// Stop all replica sets belonging to this Deployment.
rss, err := deploymentutil.ListReplicaSetsInternal(deployment,
func(namespace string, options metav1.ListOptions) ([]*extensions.ReplicaSet, error) {
rsList, err := reaper.rsClient.ReplicaSets(namespace).List(options)
if err != nil {
return nil, err
}
rss := make([]*extensions.ReplicaSet, 0, len(rsList.Items))
for i := range rsList.Items {
rss = append(rss, &rsList.Items[i])
}
return rss, nil
})
if err != nil {
return err
}
errList := []error{}
for _, rs := range rss {
if err := rsReaper.Stop(rs.Namespace, rs.Name, timeout, gracePeriod); err != nil {
if errors.IsNotFound(err) {
continue
}
errList = append(errList, err)
}
}
if len(errList) > 0 {
return utilerrors.NewAggregate(errList)
}
// Delete deployment at the end.
// Note: We delete deployment at the end so that if removing RSs fails, we at least have the deployment to retry.
policy := metav1.DeletePropagationBackground
deleteOptions := &metav1.DeleteOptions{PropagationPolicy: &policy}
return deployments.Delete(name, deleteOptions)
}
type updateDeploymentFunc func(d *extensions.Deployment)
func (reaper *DeploymentReaper) updateDeploymentWithRetries(namespace, name string, applyUpdate updateDeploymentFunc) (deployment *extensions.Deployment, err error) {
deployments := reaper.dClient.Deployments(namespace)
err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
if deployment, err = deployments.Get(name, metav1.GetOptions{}); err != nil {
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(deployment)
if deployment, err = deployments.Update(deployment); err == nil {
return true, nil
}
// Retry only on update conflict.
if errors.IsConflict(err) {
return false, nil
}
return false, err
})
return deployment, err
}
func (reaper *PodReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
pods := reaper.client.Pods(namespace)
_, err := pods.Get(name, metav1.GetOptions{})
if err != nil {
return err
}
return pods.Delete(name, gracePeriod)
}

View File

@ -1,837 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubectl
import (
"fmt"
"reflect"
"strings"
"testing"
"time"
autoscalingv1 "k8s.io/api/autoscaling/v1"
"k8s.io/apimachinery/pkg/api/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/watch"
fakescale "k8s.io/client-go/scale/fake"
testcore "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
)
func TestReplicationControllerStop(t *testing.T) {
name := "foo"
ns := "default"
tests := []struct {
Name string
Objs []runtime.Object
ScaledDown bool
StopError error
ExpectedActions []string
ScaleClientExpectedAction []string
}{
{
Name: "OnlyOneRC",
Objs: []runtime.Object{
&api.ReplicationControllerList{ // LIST
Items: []api.ReplicationController{
{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: api.ReplicationControllerSpec{
Replicas: 0,
Selector: map[string]string{"k1": "v1"}},
},
},
},
},
ScaledDown: true,
StopError: nil,
ExpectedActions: []string{"get", "list", "delete"},
ScaleClientExpectedAction: []string{"get", "update", "get", "get"},
},
{
Name: "NoOverlapping",
Objs: []runtime.Object{
&api.ReplicationControllerList{ // LIST
Items: []api.ReplicationController{
{
ObjectMeta: metav1.ObjectMeta{
Name: "baz",
Namespace: ns,
},
Spec: api.ReplicationControllerSpec{
Replicas: 0,
Selector: map[string]string{"k3": "v3"}},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: api.ReplicationControllerSpec{
Replicas: 0,
Selector: map[string]string{"k1": "v1"}},
},
},
},
},
ScaledDown: true,
StopError: nil,
ExpectedActions: []string{"get", "list", "delete"},
ScaleClientExpectedAction: []string{"get", "update", "get", "get"},
},
{
Name: "OverlappingError",
Objs: []runtime.Object{
&api.ReplicationControllerList{ // LIST
Items: []api.ReplicationController{
{
ObjectMeta: metav1.ObjectMeta{
Name: "baz",
Namespace: ns,
},
Spec: api.ReplicationControllerSpec{
Replicas: 0,
Selector: map[string]string{"k1": "v1", "k2": "v2"}},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: api.ReplicationControllerSpec{
Replicas: 0,
Selector: map[string]string{"k1": "v1"}},
},
},
},
},
ScaledDown: false, // scale resource was not scaled down due to overlapping controllers
StopError: fmt.Errorf("Detected overlapping controllers for rc foo: baz, please manage deletion individually with --cascade=false."),
ExpectedActions: []string{"get", "list"},
},
{
Name: "OverlappingButSafeDelete",
Objs: []runtime.Object{
&api.ReplicationControllerList{ // LIST
Items: []api.ReplicationController{
{
ObjectMeta: metav1.ObjectMeta{
Name: "baz",
Namespace: ns,
},
Spec: api.ReplicationControllerSpec{
Replicas: 0,
Selector: map[string]string{"k1": "v1", "k2": "v2", "k3": "v3"}},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "zaz",
Namespace: ns,
},
Spec: api.ReplicationControllerSpec{
Replicas: 0,
Selector: map[string]string{"k1": "v1"}},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: api.ReplicationControllerSpec{
Replicas: 0,
Selector: map[string]string{"k1": "v1", "k2": "v2"}},
},
},
},
},
ScaledDown: false, // scale resource was not scaled down due to overlapping controllers
StopError: fmt.Errorf("Detected overlapping controllers for rc foo: baz,zaz, please manage deletion individually with --cascade=false."),
ExpectedActions: []string{"get", "list"},
},
{
Name: "TwoExactMatchRCs",
Objs: []runtime.Object{
&api.ReplicationControllerList{ // LIST
Items: []api.ReplicationController{
{
ObjectMeta: metav1.ObjectMeta{
Name: "zaz",
Namespace: ns,
},
Spec: api.ReplicationControllerSpec{
Replicas: 0,
Selector: map[string]string{"k1": "v1"}},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: api.ReplicationControllerSpec{
Replicas: 0,
Selector: map[string]string{"k1": "v1"}},
},
},
},
},
ScaledDown: false, // scale resource was not scaled down because there is still an additional replica
StopError: nil,
ExpectedActions: []string{"get", "list", "delete"},
},
}
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
copiedForWatch := test.Objs[0].DeepCopyObject()
scaleClient := createFakeScaleClient("replicationcontrollers", "foo", 3, nil)
fake := fake.NewSimpleClientset(test.Objs...)
fakeWatch := watch.NewFake()
fake.PrependWatchReactor("replicationcontrollers", testcore.DefaultWatchReactor(fakeWatch, nil))
go func() {
fakeWatch.Add(copiedForWatch)
}()
reaper := ReplicationControllerReaper{fake.Core(), time.Millisecond, time.Millisecond, scaleClient}
err := reaper.Stop(ns, name, 0, nil)
if !reflect.DeepEqual(err, test.StopError) {
t.Fatalf("unexpected error: %v", err)
}
actions := fake.Actions()
if len(actions) != len(test.ExpectedActions) {
t.Fatalf("unexpected actions: %v, expected %d actions got %d", actions, len(test.ExpectedActions), len(actions))
}
for i, verb := range test.ExpectedActions {
if actions[i].GetResource().GroupResource() != api.Resource("replicationcontrollers") {
t.Errorf("unexpected action: %+v, expected %s-replicationController", actions[i], verb)
}
if actions[i].GetVerb() != verb {
t.Errorf("unexpected action: %+v, expected %s-replicationController", actions[i], verb)
}
}
if test.ScaledDown {
scale, err := scaleClient.Scales(ns).Get(schema.GroupResource{Group: "", Resource: "replicationcontrollers"}, name)
if err != nil {
t.Error(err)
}
if scale.Spec.Replicas != 0 {
t.Errorf("a scale subresource has unexpected number of replicas, got %d expected 0", scale.Spec.Replicas)
}
actions := scaleClient.Actions()
if len(actions) != len(test.ScaleClientExpectedAction) {
t.Errorf("unexpected actions: %v, expected %d actions got %d", actions, len(test.ScaleClientExpectedAction), len(actions))
}
for i, verb := range test.ScaleClientExpectedAction {
if actions[i].GetVerb() != verb {
t.Errorf("unexpected action: %+v, expected %s", actions[i].GetVerb(), verb)
}
}
}
})
}
}
func TestReplicaSetStop(t *testing.T) {
name := "foo"
ns := "default"
tests := []struct {
Name string
Objs []runtime.Object
DiscoveryResources []*metav1.APIResourceList
PathsResources map[string]runtime.Object
ScaledDown bool
StopError error
ExpectedActions []string
ScaleClientExpectedAction []string
}{
{
Name: "OnlyOneRS",
Objs: []runtime.Object{
&extensions.ReplicaSetList{ // LIST
TypeMeta: metav1.TypeMeta{
APIVersion: extensions.SchemeGroupVersion.String(),
},
Items: []extensions.ReplicaSet{
{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: extensions.ReplicaSetSpec{
Replicas: 0,
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}},
},
},
},
},
},
ScaledDown: true,
StopError: nil,
ExpectedActions: []string{"get", "delete"},
ScaleClientExpectedAction: []string{"get", "update", "get", "get"},
},
{
Name: "NoOverlapping",
Objs: []runtime.Object{
&extensions.ReplicaSetList{ // LIST
Items: []extensions.ReplicaSet{
{
ObjectMeta: metav1.ObjectMeta{
Name: "baz",
Namespace: ns,
},
Spec: extensions.ReplicaSetSpec{
Replicas: 0,
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"k3": "v3"}},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: extensions.ReplicaSetSpec{
Replicas: 0,
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}},
},
},
},
},
},
ScaledDown: true,
StopError: nil,
ExpectedActions: []string{"get", "delete"},
ScaleClientExpectedAction: []string{"get", "update", "get", "get"},
},
// TODO: Implement tests for overlapping replica sets, similar to replication controllers,
// when the overlapping checks are implemented for replica sets.
}
for _, test := range tests {
fake := fake.NewSimpleClientset(test.Objs...)
scaleClient := createFakeScaleClient("replicasets", "foo", 3, nil)
reaper := ReplicaSetReaper{fake.Extensions(), time.Millisecond, time.Millisecond, scaleClient, schema.GroupResource{Group: "extensions", Resource: "replicasets"}}
err := reaper.Stop(ns, name, 0, nil)
if !reflect.DeepEqual(err, test.StopError) {
t.Errorf("%s unexpected error: %v", test.Name, err)
continue
}
actions := fake.Actions()
if len(actions) != len(test.ExpectedActions) {
t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ExpectedActions), len(actions))
continue
}
for i, verb := range test.ExpectedActions {
if actions[i].GetResource().GroupResource() != extensions.Resource("replicasets") {
t.Errorf("%s unexpected action: %+v, expected %s-replicaSet", test.Name, actions[i], verb)
}
if actions[i].GetVerb() != verb {
t.Errorf("%s unexpected action: %+v, expected %s-replicaSet", test.Name, actions[i], verb)
}
}
if test.ScaledDown {
scale, err := scaleClient.Scales(ns).Get(schema.GroupResource{Group: "extensions", Resource: "replicasets"}, name)
if err != nil {
t.Error(err)
}
if scale.Spec.Replicas != 0 {
t.Errorf("a scale subresource has unexpected number of replicas, got %d expected 0", scale.Spec.Replicas)
}
actions := scaleClient.Actions()
if len(actions) != len(test.ScaleClientExpectedAction) {
t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ScaleClientExpectedAction), len(actions))
}
for i, verb := range test.ScaleClientExpectedAction {
if actions[i].GetVerb() != verb {
t.Errorf("%s unexpected action: %+v, expected %s", test.Name, actions[i].GetVerb(), verb)
}
}
}
}
}
func TestJobStop(t *testing.T) {
name := "foo"
ns := "default"
zero := int32(0)
tests := []struct {
Name string
Objs []runtime.Object
StopError error
ExpectedActions []string
}{
{
Name: "OnlyOneJob",
Objs: []runtime.Object{
&batch.JobList{ // LIST
Items: []batch.Job{
{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: batch.JobSpec{
Parallelism: &zero,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"k1": "v1"},
},
},
},
},
},
},
StopError: nil,
ExpectedActions: []string{"get:jobs", "get:jobs", "update:jobs",
"get:jobs", "get:jobs", "list:pods", "delete:jobs"},
},
{
Name: "JobWithDeadPods",
Objs: []runtime.Object{
&batch.JobList{ // LIST
Items: []batch.Job{
{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: batch.JobSpec{
Parallelism: &zero,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"k1": "v1"},
},
},
},
},
},
&api.PodList{ // LIST
Items: []api.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: ns,
Labels: map[string]string{"k1": "v1"},
},
},
},
},
},
StopError: nil,
ExpectedActions: []string{"get:jobs", "get:jobs", "update:jobs",
"get:jobs", "get:jobs", "list:pods", "delete:pods", "delete:jobs"},
},
}
for _, test := range tests {
fake := fake.NewSimpleClientset(test.Objs...)
reaper := JobReaper{fake.Batch(), fake.Core(), time.Millisecond, time.Millisecond}
err := reaper.Stop(ns, name, 0, nil)
if !reflect.DeepEqual(err, test.StopError) {
t.Errorf("%s unexpected error: %v", test.Name, err)
continue
}
actions := fake.Actions()
if len(actions) != len(test.ExpectedActions) {
t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ExpectedActions), len(actions))
continue
}
for i, expAction := range test.ExpectedActions {
action := strings.Split(expAction, ":")
if actions[i].GetVerb() != action[0] {
t.Errorf("%s unexpected verb: %+v, expected %s", test.Name, actions[i], expAction)
}
if actions[i].GetResource().Resource != action[1] {
t.Errorf("%s unexpected resource: %+v, expected %s", test.Name, actions[i], expAction)
}
}
}
}
func TestDeploymentStop(t *testing.T) {
name := "foo"
ns := "default"
deployment := extensions.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: uuid.NewUUID(),
Namespace: ns,
},
Spec: extensions.DeploymentSpec{
Replicas: 0,
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}},
},
Status: extensions.DeploymentStatus{
Replicas: 0,
},
}
trueVar := true
tests := []struct {
Name string
Objs []runtime.Object
ScaledDown bool
StopError error
ExpectedActions []string
ScaleClientExpectedAction []string
}{
{
Name: "SimpleDeployment",
Objs: []runtime.Object{
&extensions.Deployment{ // GET
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: extensions.DeploymentSpec{
Replicas: 0,
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}},
},
Status: extensions.DeploymentStatus{
Replicas: 0,
},
},
},
StopError: nil,
ExpectedActions: []string{"get:deployments", "update:deployments",
"get:deployments", "list:replicasets", "delete:deployments"},
},
{
Name: "Deployment with single replicaset",
Objs: []runtime.Object{
&deployment, // GET
&extensions.ReplicaSetList{ // LIST
Items: []extensions.ReplicaSet{
// ReplicaSet owned by this Deployment.
{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
Labels: map[string]string{"k1": "v1"},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: extensions.SchemeGroupVersion.String(),
Kind: "Deployment",
Name: deployment.Name,
UID: deployment.UID,
Controller: &trueVar,
},
},
},
Spec: extensions.ReplicaSetSpec{},
},
// ReplicaSet owned by something else (should be ignored).
{
ObjectMeta: metav1.ObjectMeta{
Name: "rs2",
Namespace: ns,
Labels: map[string]string{"k1": "v1"},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: extensions.SchemeGroupVersion.String(),
Kind: "Deployment",
Name: "somethingelse",
UID: uuid.NewUUID(),
Controller: &trueVar,
},
},
},
Spec: extensions.ReplicaSetSpec{},
},
},
},
},
ScaledDown: true,
StopError: nil,
ExpectedActions: []string{"get:deployments", "update:deployments",
"get:deployments", "list:replicasets", "get:replicasets",
"delete:replicasets", "delete:deployments"},
ScaleClientExpectedAction: []string{"get", "update", "get", "get"},
},
}
for _, test := range tests {
scaleClient := createFakeScaleClient("deployments", "foo", 3, nil)
fake := fake.NewSimpleClientset(test.Objs...)
reaper := DeploymentReaper{fake.Extensions(), fake.Extensions(), time.Millisecond, time.Millisecond, scaleClient, schema.GroupResource{Group: "extensions", Resource: "deployments"}}
err := reaper.Stop(ns, name, 0, nil)
if !reflect.DeepEqual(err, test.StopError) {
t.Errorf("%s unexpected error: %v", test.Name, err)
continue
}
actions := fake.Actions()
if len(actions) != len(test.ExpectedActions) {
t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ExpectedActions), len(actions))
continue
}
for i, expAction := range test.ExpectedActions {
action := strings.Split(expAction, ":")
if actions[i].GetVerb() != action[0] {
t.Errorf("%s unexpected verb: %+v, expected %s", test.Name, actions[i], expAction)
}
if actions[i].GetResource().Resource != action[1] {
t.Errorf("%s unexpected resource: %+v, expected %s", test.Name, actions[i], expAction)
}
if len(action) == 3 && actions[i].GetSubresource() != action[2] {
t.Errorf("%s unexpected subresource: %+v, expected %s", test.Name, actions[i], expAction)
}
}
if test.ScaledDown {
scale, err := scaleClient.Scales(ns).Get(schema.GroupResource{Group: "extensions", Resource: "replicaset"}, name)
if err != nil {
t.Error(err)
}
if scale.Spec.Replicas != 0 {
t.Errorf("a scale subresource has unexpected number of replicas, got %d expected 0", scale.Spec.Replicas)
}
actions := scaleClient.Actions()
if len(actions) != len(test.ScaleClientExpectedAction) {
t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ScaleClientExpectedAction), len(actions))
}
for i, verb := range test.ScaleClientExpectedAction {
if actions[i].GetVerb() != verb {
t.Errorf("%s unexpected action: %+v, expected %s", test.Name, actions[i].GetVerb(), verb)
}
}
}
}
}
type noSuchPod struct {
coreclient.PodInterface
}
func (c *noSuchPod) Get(name string, options metav1.GetOptions) (*api.Pod, error) {
return nil, fmt.Errorf("%s does not exist", name)
}
type noDeletePod struct {
coreclient.PodInterface
}
func (c *noDeletePod) Delete(name string, o *metav1.DeleteOptions) error {
return fmt.Errorf("I'm afraid I can't do that, Dave")
}
type reaperFake struct {
*fake.Clientset
noSuchPod, noDeletePod bool
}
func (c *reaperFake) Core() coreclient.CoreInterface {
return &reaperCoreFake{c.Clientset.Core(), c.noSuchPod, c.noDeletePod}
}
type reaperCoreFake struct {
coreclient.CoreInterface
noSuchPod, noDeletePod bool
}
func (c *reaperCoreFake) Pods(namespace string) coreclient.PodInterface {
pods := c.CoreInterface.Pods(namespace)
if c.noSuchPod {
return &noSuchPod{pods}
}
if c.noDeletePod {
return &noDeletePod{pods}
}
return pods
}
func newPod() *api.Pod {
return &api.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceDefault, Name: "foo"}}
}
func TestSimpleStop(t *testing.T) {
tests := []struct {
fake *reaperFake
kind schema.GroupKind
actions []testcore.Action
expectError bool
test string
}{
{
fake: &reaperFake{
Clientset: fake.NewSimpleClientset(newPod()),
},
kind: api.Kind("Pod"),
actions: []testcore.Action{
testcore.NewGetAction(api.Resource("pods").WithVersion(""), metav1.NamespaceDefault, "foo"),
testcore.NewDeleteAction(api.Resource("pods").WithVersion(""), metav1.NamespaceDefault, "foo"),
},
expectError: false,
test: "stop pod succeeds",
},
{
fake: &reaperFake{
Clientset: fake.NewSimpleClientset(),
noSuchPod: true,
},
kind: api.Kind("Pod"),
actions: []testcore.Action{},
expectError: true,
test: "stop pod fails, no pod",
},
{
fake: &reaperFake{
Clientset: fake.NewSimpleClientset(newPod()),
noDeletePod: true,
},
kind: api.Kind("Pod"),
actions: []testcore.Action{
testcore.NewGetAction(api.Resource("pods").WithVersion(""), metav1.NamespaceDefault, "foo"),
},
expectError: true,
test: "stop pod fails, can't delete",
},
}
for _, test := range tests {
fake := test.fake
reaper, err := ReaperFor(test.kind, fake, nil)
if err != nil {
t.Errorf("unexpected error: %v (%s)", err, test.test)
}
err = reaper.Stop("default", "foo", 0, nil)
if err != nil && !test.expectError {
t.Errorf("unexpected error: %v (%s)", err, test.test)
}
if err == nil {
if test.expectError {
t.Errorf("unexpected non-error: %v (%s)", err, test.test)
}
}
actions := fake.Actions()
if len(test.actions) != len(actions) {
t.Errorf("unexpected actions: %v; expected %v (%s)", actions, test.actions, test.test)
}
for i, action := range actions {
testAction := test.actions[i]
if action != testAction {
t.Errorf("unexpected action: %#v; expected %v (%s)", action, testAction, test.test)
}
}
}
}
func TestDeploymentNotFoundError(t *testing.T) {
name := "foo"
ns := "default"
deployment := &extensions.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: extensions.DeploymentSpec{
Replicas: 0,
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}},
},
Status: extensions.DeploymentStatus{
Replicas: 0,
},
}
fake := fake.NewSimpleClientset(
deployment,
&extensions.ReplicaSetList{Items: []extensions.ReplicaSet{
{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: extensions.ReplicaSetSpec{},
},
},
},
)
fake.AddReactor("get", "replicasets", func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.NewNotFound(api.Resource("replicaset"), "doesn't-matter")
})
reaper := DeploymentReaper{fake.Extensions(), fake.Extensions(), time.Millisecond, time.Millisecond, nil, schema.GroupResource{}}
if err := reaper.Stop(ns, name, 0, nil); err != nil {
t.Fatalf("unexpected error: %#v", err)
}
}
func createFakeScaleClient(resource string, resourceName string, replicas int, errorsOnVerb map[string]*kerrors.StatusError) *fakescale.FakeScaleClient {
shouldReturnAnError := func(verb string) (*kerrors.StatusError, bool) {
if anError, anErrorExists := errorsOnVerb[verb]; anErrorExists {
return anError, true
}
return &kerrors.StatusError{}, false
}
newReplicas := int32(replicas)
scaleClient := &fakescale.FakeScaleClient{}
scaleClient.AddReactor("get", resource, func(rawAction testcore.Action) (handled bool, ret runtime.Object, err error) {
action := rawAction.(testcore.GetAction)
if action.GetName() != resourceName {
return true, nil, fmt.Errorf("expected = %s, got = %s", resourceName, action.GetName())
}
if anError, should := shouldReturnAnError("get"); should {
return true, nil, anError
}
obj := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: action.GetName(),
Namespace: action.GetNamespace(),
},
Spec: autoscalingv1.ScaleSpec{
Replicas: newReplicas,
},
}
return true, obj, nil
})
scaleClient.AddReactor("update", resource, func(rawAction testcore.Action) (handled bool, ret runtime.Object, err error) {
action := rawAction.(testcore.UpdateAction)
obj := action.GetObject().(*autoscalingv1.Scale)
if obj.Name != resourceName {
return true, nil, fmt.Errorf("expected = %s, got = %s", resourceName, obj.Name)
}
if anError, should := shouldReturnAnError("update"); should {
return true, nil, anError
}
newReplicas = obj.Spec.Replicas
return true, &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: obj.Name,
Namespace: action.GetNamespace(),
},
Spec: autoscalingv1.ScaleSpec{
Replicas: newReplicas,
},
}, nil
})
return scaleClient
}

View File

@ -21,10 +21,14 @@ import (
"testing"
"time"
autoscalingv1 "k8s.io/api/autoscaling/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/scale"
fakescale "k8s.io/client-go/scale/fake"
testcore "k8s.io/client-go/testing"
api "k8s.io/kubernetes/pkg/apis/core"
)
@ -626,3 +630,54 @@ func TestGenericScale(t *testing.T) {
})
}
}
func createFakeScaleClient(resource string, resourceName string, replicas int, errorsOnVerb map[string]*kerrors.StatusError) *fakescale.FakeScaleClient {
shouldReturnAnError := func(verb string) (*kerrors.StatusError, bool) {
if anError, anErrorExists := errorsOnVerb[verb]; anErrorExists {
return anError, true
}
return &kerrors.StatusError{}, false
}
newReplicas := int32(replicas)
scaleClient := &fakescale.FakeScaleClient{}
scaleClient.AddReactor("get", resource, func(rawAction testcore.Action) (handled bool, ret runtime.Object, err error) {
action := rawAction.(testcore.GetAction)
if action.GetName() != resourceName {
return true, nil, fmt.Errorf("expected = %s, got = %s", resourceName, action.GetName())
}
if anError, should := shouldReturnAnError("get"); should {
return true, nil, anError
}
obj := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: action.GetName(),
Namespace: action.GetNamespace(),
},
Spec: autoscalingv1.ScaleSpec{
Replicas: newReplicas,
},
}
return true, obj, nil
})
scaleClient.AddReactor("update", resource, func(rawAction testcore.Action) (handled bool, ret runtime.Object, err error) {
action := rawAction.(testcore.UpdateAction)
obj := action.GetObject().(*autoscalingv1.Scale)
if obj.Name != resourceName {
return true, nil, fmt.Errorf("expected = %s, got = %s", resourceName, obj.Name)
}
if anError, should := shouldReturnAnError("update"); should {
return true, nil, anError
}
newReplicas = obj.Spec.Replicas
return true, &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: obj.Name,
Namespace: action.GetNamespace(),
},
Spec: autoscalingv1.ScaleSpec{
Replicas: newReplicas,
},
}, nil
})
return scaleClient
}

View File

@ -29,14 +29,12 @@ go_library(
"//pkg/apis/batch:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/controller/daemon:go_default_library",
"//pkg/controller/deployment/util:go_default_library",
"//pkg/controller/job:go_default_library",
"//pkg/controller/nodelifecycle:go_default_library",
"//pkg/controller/replicaset:go_default_library",
"//pkg/controller/replication:go_default_library",
"//pkg/kubectl:go_default_library",
"//pkg/master/ports:go_default_library",
"//pkg/scheduler/schedulercache:go_default_library",
"//pkg/util/pointer:go_default_library",
@ -67,7 +65,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/scale:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@ -33,7 +33,6 @@ import (
"k8s.io/kubernetes/pkg/api/legacyscheme"
batchinternal "k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/test/e2e/framework"
)
@ -207,11 +206,7 @@ var _ = SIGDescribe("CronJob", func() {
By("Deleting the job")
job := cronJob.Status.Active[0]
reaper, err := kubectl.ReaperFor(batchinternal.Kind("Job"), f.InternalClientset, f.ScalesGetter)
Expect(err).NotTo(HaveOccurred())
timeout := 1 * time.Minute
err = reaper.Stop(f.Namespace.Name, job.Name, timeout, metav1.NewDeleteOptions(0))
Expect(err).NotTo(HaveOccurred())
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name))
By("Ensuring job was deleted")
_, err = framework.GetJob(f.ClientSet, f.Namespace.Name, job.Name)

View File

@ -36,7 +36,6 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
"k8s.io/kubernetes/test/e2e/framework"
@ -69,11 +68,8 @@ var _ = SIGDescribe("Daemon set [Serial]", func() {
Expect(err).NotTo(HaveOccurred(), "unable to dump DaemonSets")
if daemonsets != nil && len(daemonsets.Items) > 0 {
for _, ds := range daemonsets.Items {
By(fmt.Sprintf("Deleting DaemonSet %q with reaper", ds.Name))
dsReaper, err := kubectl.ReaperFor(extensionsinternal.Kind("DaemonSet"), f.InternalClientset, f.ScalesGetter)
Expect(err).NotTo(HaveOccurred())
err = dsReaper.Stop(f.Namespace.Name, ds.Name, 0, nil)
Expect(err).NotTo(HaveOccurred())
By(fmt.Sprintf("Deleting DaemonSet %q", ds.Name))
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(f.ClientSet, extensionsinternal.Kind("DaemonSet"), f.Namespace.Name, ds.Name))
err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, &ds))
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to be reaped")
}

View File

@ -36,11 +36,8 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
scaleclient "k8s.io/client-go/scale"
appsinternal "k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/kubectl"
utilpointer "k8s.io/kubernetes/pkg/util/pointer"
"k8s.io/kubernetes/test/e2e/framework"
testutil "k8s.io/kubernetes/test/utils"
@ -160,17 +157,12 @@ func newDeploymentRollback(name string, annotations map[string]string, revision
}
}
func stopDeployment(c clientset.Interface, internalClient internalclientset.Interface, scaleClient scaleclient.ScalesGetter, ns, deploymentName string) {
func stopDeployment(c clientset.Interface, ns, deploymentName string) {
deployment, err := c.AppsV1().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
framework.Logf("Deleting deployment %s", deploymentName)
reaper, err := kubectl.ReaperFor(appsinternal.Kind("Deployment"), internalClient, scaleClient)
Expect(err).NotTo(HaveOccurred())
timeout := 1 * time.Minute
err = reaper.Stop(ns, deployment.Name, timeout, metav1.NewDeleteOptions(0))
Expect(err).NotTo(HaveOccurred())
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(c, appsinternal.Kind("Deployment"), ns, deployment.Name))
framework.Logf("Ensuring deployment %s was deleted", deploymentName)
_, err = c.AppsV1().Deployments(ns).Get(deployment.Name, metav1.GetOptions{})
@ -203,7 +195,6 @@ func stopDeployment(c clientset.Interface, internalClient internalclientset.Inte
func testDeleteDeployment(f *framework.Framework) {
ns := f.Namespace.Name
c := f.ClientSet
internalClient := f.InternalClientset
deploymentName := "test-new-deployment"
podLabels := map[string]string{"name": NginxImageName}
@ -226,7 +217,7 @@ func testDeleteDeployment(f *framework.Framework) {
newRS, err := deploymentutil.GetNewReplicaSet(deployment, c.AppsV1())
Expect(err).NotTo(HaveOccurred())
Expect(newRS).NotTo(Equal(nilRs))
stopDeployment(c, internalClient, f.ScalesGetter, ns, deploymentName)
stopDeployment(c, ns, deploymentName)
}
func testRollingUpdateDeployment(f *framework.Framework) {

View File

@ -24,7 +24,6 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
batchinternal "k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
@ -111,11 +110,7 @@ var _ = SIGDescribe("Job", func() {
Expect(err).NotTo(HaveOccurred())
By("delete a job")
reaper, err := kubectl.ReaperFor(batchinternal.Kind("Job"), f.InternalClientset, f.ScalesGetter)
Expect(err).NotTo(HaveOccurred())
timeout := 1 * time.Minute
err = reaper.Stop(f.Namespace.Name, job.Name, timeout, metav1.NewDeleteOptions(0))
Expect(err).NotTo(HaveOccurred())
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name))
By("Ensuring job was deleted")
_, err = framework.GetJob(f.ClientSet, f.Namespace.Name, job.Name)

View File

@ -348,7 +348,7 @@ var _ = framework.KubeDescribe("Cluster size autoscaler scalability [Slow]", fun
timeToWait := 5 * time.Minute
podsConfig := reserveMemoryRCConfig(f, "unschedulable-pod", unschedulablePodReplicas, totalMemReservation, timeToWait)
framework.RunRC(*podsConfig) // Ignore error (it will occur because pods are unschedulable)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, podsConfig.Name)
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, podsConfig.Name)
// Ensure that no new nodes have been added so far.
Expect(framework.NumberOfReadyNodes(f.ClientSet)).To(Equal(nodeCount))
@ -418,7 +418,7 @@ func simpleScaleUpTestWithTolerance(f *framework.Framework, config *scaleUpTestC
}
timeTrack(start, fmt.Sprintf("Scale up to %v", config.expectedResult.nodes))
return func() error {
return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, config.extraPods.Name)
return framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, config.extraPods.Name)
}
}
@ -501,7 +501,7 @@ func createHostPortPodsWithMemory(f *framework.Framework, id string, replicas, p
err := framework.RunRC(*config)
framework.ExpectNoError(err)
return func() error {
return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, id)
return framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, id)
}
}
@ -541,7 +541,7 @@ func distributeLoad(f *framework.Framework, namespace string, id string, podDist
framework.ExpectNoError(framework.RunRC(*rcConfig))
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, f.ClientSet))
return func() error {
return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, id)
return framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, id)
}
}

View File

@ -169,7 +169,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
It("shouldn't increase cluster size if pending pod is too large [Feature:ClusterSizeAutoscalingScaleUp]", func() {
By("Creating unschedulable pod")
ReserveMemory(f, "memory-reservation", 1, int(1.1*float64(memAllocatableMb)), false, defaultTimeout)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "memory-reservation")
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "memory-reservation")
By("Waiting for scale up hoping it won't happen")
// Verify that the appropriate event was generated
@ -196,7 +196,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
simpleScaleUpTest := func(unready int) {
ReserveMemory(f, "memory-reservation", 100, nodeCount*memAllocatableMb, false, 1*time.Second)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "memory-reservation")
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "memory-reservation")
// Verify that cluster size is increased
framework.ExpectNoError(WaitForClusterSizeFuncWithUnready(f.ClientSet,
@ -269,7 +269,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
By("Schedule bunch of pods beyond point of filling default pool but do not request any GPUs")
ReserveMemory(f, "memory-reservation", 100, nodeCount*memAllocatableMb, false, 1*time.Second)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "memory-reservation")
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "memory-reservation")
// Verify that cluster size is increased
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
@ -296,7 +296,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
defer disableAutoscaler(gpuPoolName, 0, 1)
Expect(len(getPoolNodes(f, gpuPoolName))).Should(Equal(1))
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "gpu-pod-rc")
framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "gpu-pod-rc")
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size == nodeCount }, scaleDownTimeout))
@ -319,7 +319,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
By("Schedule more pods than can fit and wait for cluster to scale-up")
ReserveMemory(f, "memory-reservation", 100, nodeCount*memAllocatableMb, false, 1*time.Second)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "memory-reservation")
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "memory-reservation")
status, err = waitForScaleUpStatus(c, func(s *scaleUpStatus) bool {
return s.status == caOngoingScaleUpStatus
@ -362,7 +362,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
By("Reserving 0.1x more memory than the cluster holds to trigger scale up")
totalMemoryReservation := int(1.1 * float64(nodeCount*memAllocatableMb+extraMemMb))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "memory-reservation")
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "memory-reservation")
ReserveMemory(f, "memory-reservation", 100, totalMemoryReservation, false, defaultTimeout)
// Verify, that cluster size is increased
@ -386,7 +386,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
It("should increase cluster size if pods are pending due to host port conflict [Feature:ClusterSizeAutoscalingScaleUp]", func() {
scheduling.CreateHostPortPods(f, "host-port", nodeCount+2, false)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "host-port")
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "host-port")
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size >= nodeCount+2 }, scaleUpTimeout))
@ -401,12 +401,12 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
}
By("starting a pod with anti-affinity on each node")
framework.ExpectNoError(runAntiAffinityPods(f, f.Namespace.Name, pods, "some-pod", labels, labels))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "some-pod")
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "some-pod")
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
By("scheduling extra pods with anti-affinity to existing ones")
framework.ExpectNoError(runAntiAffinityPods(f, f.Namespace.Name, newPods, "extra-pod", labels, labels))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "extra-pod")
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "extra-pod")
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount+newPods, scaleUpTimeout))
@ -420,14 +420,14 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
"anti-affinity": "yes",
}
framework.ExpectNoError(runAntiAffinityPods(f, f.Namespace.Name, pods, "some-pod", labels, labels))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "some-pod")
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "some-pod")
By("waiting for all pods before triggering scale up")
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
By("creating a pod requesting EmptyDir")
framework.ExpectNoError(runVolumeAntiAffinityPods(f, f.Namespace.Name, newPods, "extra-pod", labels, labels, emptyDirVolumes))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "extra-pod")
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "extra-pod")
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount+newPods, scaleUpTimeout))
@ -484,7 +484,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
}
framework.ExpectNoError(runAntiAffinityPods(f, f.Namespace.Name, pods, "some-pod", labels, labels))
defer func() {
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "some-pod")
framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "some-pod")
glog.Infof("RC and pods not using volume deleted")
}()
@ -497,7 +497,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
volumes := buildVolumes(pv, pvc)
framework.ExpectNoError(runVolumeAntiAffinityPods(f, f.Namespace.Name, newPods, pvcPodName, labels, labels, volumes))
defer func() {
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, pvcPodName)
framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, pvcPodName)
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
}()
@ -602,7 +602,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
defer removeLabels(registeredNodes)
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
framework.ExpectNoError(framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "node-selector"))
framework.ExpectNoError(framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "node-selector"))
})
It("should scale up correct target pool [Feature:ClusterSizeAutoscalingScaleUp]", func() {
@ -620,7 +620,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
extraPods := extraNodes + 1
totalMemoryReservation := int(float64(extraPods) * 1.5 * float64(memAllocatableMb))
By(fmt.Sprintf("Creating rc with %v pods too big to fit default-pool but fitting extra-pool", extraPods))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "memory-reservation")
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "memory-reservation")
ReserveMemory(f, "memory-reservation", extraPods, totalMemoryReservation, false, defaultTimeout)
// Apparently GKE master is restarted couple minutes after the node pool is added
@ -759,7 +759,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
By("Run a scale-up test")
ReserveMemory(f, "memory-reservation", 1, 100, false, 1*time.Second)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "memory-reservation")
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "memory-reservation")
// Verify that cluster size is increased
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
@ -872,7 +872,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
framework.TestUnderTemporaryNetworkFailure(c, "default", ntb, testFunction)
} else {
ReserveMemory(f, "memory-reservation", 100, nodeCount*memAllocatableMb, false, defaultTimeout)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "memory-reservation")
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, "memory-reservation")
time.Sleep(scaleUpTimeout)
currentNodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
framework.Logf("Currently available nodes: %v, nodes available at the start of test: %v, disabled nodes: %v", len(currentNodes.Items), len(nodes.Items), nodesToBreakCount)
@ -1076,7 +1076,7 @@ func runDrainTest(f *framework.Framework, migSizes map[string]int, namespace str
labelMap := map[string]string{"test_id": testID}
framework.ExpectNoError(runReplicatedPodOnEachNode(f, nodes.Items, namespace, podsPerNode, "reschedulable-pods", labelMap, 0))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, namespace, "reschedulable-pods")
defer framework.DeleteRCAndWaitForGC(f.ClientSet, namespace, "reschedulable-pods")
By("Create a PodDisruptionBudget")
minAvailable := intstr.FromInt(numPods - pdbSize)
@ -1523,7 +1523,7 @@ func reserveMemory(f *framework.Framework, id string, replicas, megabytes int, e
framework.ExpectNoError(err)
}
return func() error {
return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, id)
return framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, id)
}
}
framework.Failf("Failed to reserve memory within timeout")
@ -1929,7 +1929,7 @@ func runReplicatedPodOnEachNode(f *framework.Framework, nodes []v1.Node, namespa
func runReplicatedPodOnEachNodeWithCleanup(f *framework.Framework, nodes []v1.Node, namespace string, podsPerNode int, id string, labels map[string]string, memRequest int64) (func(), error) {
err := runReplicatedPodOnEachNode(f, nodes, namespace, podsPerNode, id, labels, memRequest)
return func() {
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, namespace, id)
framework.DeleteRCAndWaitForGC(f.ClientSet, namespace, id)
}, err
}

View File

@ -414,9 +414,9 @@ func (rc *ResourceConsumer) CleanUp() {
// Wait some time to ensure all child goroutines are finished.
time.Sleep(10 * time.Second)
kind := rc.kind.GroupKind()
framework.ExpectNoError(framework.DeleteResourceAndPods(rc.clientSet, rc.internalClientset, rc.scaleClient, kind, rc.nsName, rc.name))
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(rc.clientSet, kind, rc.nsName, rc.name))
framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(rc.name, nil))
framework.ExpectNoError(framework.DeleteResourceAndPods(rc.clientSet, rc.internalClientset, rc.scaleClient, api.Kind("ReplicationController"), rc.nsName, rc.controllerName))
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(rc.clientSet, api.Kind("ReplicationController"), rc.nsName, rc.controllerName))
framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(rc.controllerName, nil))
}

View File

@ -210,6 +210,17 @@ func WaitForJobFailure(c clientset.Interface, ns, jobName string, timeout time.D
})
}
// WaitForJobGone uses c to wait for up to timeout for the Job named jobName in namespace ns to be removed.
func WaitForJobGone(c clientset.Interface, ns, jobName string, timeout time.Duration) error {
return wait.Poll(Poll, timeout, func() (bool, error) {
_, err := c.BatchV1().Jobs(ns).Get(jobName, metav1.GetOptions{})
if errors.IsNotFound(err) {
return true, nil
}
return false, err
})
}
// CheckForAllJobPodsRunning uses c to check in the Job named jobName in ns is running. If the returned error is not
// nil the returned bool is true if the Job is running.
func CheckForAllJobPodsRunning(c clientset.Interface, ns, jobName string, parallelism int32) (bool, error) {

View File

@ -31,7 +31,6 @@ import (
clientset "k8s.io/client-go/kubernetes"
scaleclient "k8s.io/client-go/scale"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
testutils "k8s.io/kubernetes/test/utils"
)
@ -153,10 +152,6 @@ func DeleteRCAndWaitForGC(c clientset.Interface, ns, name string) error {
return DeleteResourceAndWaitForGC(c, api.Kind("ReplicationController"), ns, name)
}
func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalclientset.Interface, scaleClient scaleclient.ScalesGetter, ns, name string) error {
return DeleteResourceAndPods(clientset, internalClientset, scaleClient, api.Kind("ReplicationController"), ns, name)
}
func ScaleRC(clientset clientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error {
return ScaleResource(clientset, scalesGetter, ns, name, size, wait, api.Kind("ReplicationController"), api.Resource("replicationcontrollers"))
}

View File

@ -47,7 +47,6 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
scaleclient "k8s.io/client-go/scale"
)
const (
@ -1261,8 +1260,8 @@ func StartServeHostnameService(c clientset.Interface, internalClient internalcli
return podNames, serviceIP, nil
}
func StopServeHostnameService(clientset clientset.Interface, internalClientset internalclientset.Interface, scaleClient scaleclient.ScalesGetter, ns, name string) error {
if err := DeleteRCAndPods(clientset, internalClientset, scaleClient, ns, name); err != nil {
func StopServeHostnameService(clientset clientset.Interface, ns, name string) error {
if err := DeleteRCAndWaitForGC(clientset, ns, name); err != nil {
return err
}
if err := clientset.CoreV1().Services(ns).Delete(name, nil); err != nil {

View File

@ -109,8 +109,6 @@ type TestContextType struct {
DisableLogDump bool
// Path to the GCS artifacts directory to dump logs from nodes. Logexporter gets enabled if this is non-empty.
LogexporterGCSPath string
// If the garbage collector is enabled in the kube-apiserver and kube-controller-manager.
GarbageCollectorEnabled bool
// featureGates is a map of feature names to bools that enable or disable alpha/experimental features.
FeatureGates map[string]bool
// Node e2e specific test context
@ -276,7 +274,6 @@ func RegisterClusterFlags() {
flag.StringVar(&TestContext.IngressUpgradeImage, "ingress-upgrade-image", "", "Image to upgrade to if doing an upgrade test for ingress.")
flag.StringVar(&TestContext.GCEUpgradeScript, "gce-upgrade-script", "", "Script to use to upgrade a GCE cluster.")
flag.BoolVar(&TestContext.CleanStart, "clean-start", false, "If true, purge all namespaces except default and system before running tests. This serves to Cleanup test namespaces from failed/interrupted e2e runs in a long-lived cluster.")
flag.BoolVar(&TestContext.GarbageCollectorEnabled, "garbage-collector-enabled", true, "Set to true if the garbage collector is enabled in the kube-apiserver and kube-controller-manager, then some tests will rely on the garbage collector to delete dependent resources.")
}
// Register flags specific to the node e2e test suite.

View File

@ -3042,50 +3042,6 @@ func getReplicasFromRuntimeObject(obj runtime.Object) (int32, error) {
}
}
// DeleteResourceAndPods deletes a given resource and all pods it spawned
func DeleteResourceAndPods(clientset clientset.Interface, internalClientset internalclientset.Interface, scaleClient scaleclient.ScalesGetter, kind schema.GroupKind, ns, name string) error {
By(fmt.Sprintf("deleting %v %s in namespace %s", kind, name, ns))
rtObject, err := getRuntimeObjectForKind(clientset, kind, ns, name)
if err != nil {
if apierrs.IsNotFound(err) {
Logf("%v %s not found: %v", kind, name, err)
return nil
}
return err
}
selector, err := getSelectorFromRuntimeObject(rtObject)
if err != nil {
return err
}
ps, err := testutils.NewPodStore(clientset, ns, selector, fields.Everything())
if err != nil {
return err
}
defer ps.Stop()
startTime := time.Now()
if err := testutils.DeleteResourceUsingReaperWithRetries(internalClientset, kind, ns, name, nil, scaleClient); err != nil {
return fmt.Errorf("error while stopping %v: %s: %v", kind, name, err)
}
deleteTime := time.Since(startTime)
Logf("Deleting %v %s took: %v", kind, name, deleteTime)
err = waitForPodsInactive(ps, 100*time.Millisecond, 10*time.Minute)
if err != nil {
return fmt.Errorf("error while waiting for pods to become inactive %s: %v", name, err)
}
terminatePodTime := time.Since(startTime) - deleteTime
Logf("Terminating %v %s pods took: %v", kind, name, terminatePodTime)
// this is to relieve namespace controller's pressure when deleting the
// namespace after a test.
err = waitForPodsGone(ps, 100*time.Millisecond, 10*time.Minute)
if err != nil {
return fmt.Errorf("error while waiting for pods gone %s: %v", name, err)
}
gcPodTime := time.Since(startTime) - terminatePodTime
Logf("Garbage collecting %v %s pods took: %v", kind, name, gcPodTime)
return nil
}
// DeleteResourceAndWaitForGC deletes only given resource and waits for GC to delete the pods.
func DeleteResourceAndWaitForGC(c clientset.Interface, kind schema.GroupKind, ns, name string) error {
By(fmt.Sprintf("deleting %v %s in namespace %s, will wait for the garbage collector to delete the pods", kind, name, ns))

View File

@ -1545,8 +1545,11 @@ metadata:
Expect(runOutput).To(ContainSubstring("abcd1234"))
Expect(runOutput).To(ContainSubstring("stdin closed"))
err := framework.WaitForJobGone(c, ns, jobName, 10*time.Second)
Expect(err).NotTo(HaveOccurred())
By("verifying the job " + jobName + " was deleted")
_, err := c.BatchV1().Jobs(ns).Get(jobName, metav1.GetOptions{})
_, err = c.BatchV1().Jobs(ns).Get(jobName, metav1.GetOptions{})
Expect(err).To(HaveOccurred())
Expect(apierrs.IsNotFound(err)).To(BeTrue())
})

View File

@ -161,7 +161,7 @@ var _ = SIGDescribe("Proxy", func() {
CreatedPods: &pods,
}
Expect(framework.RunRC(cfg)).NotTo(HaveOccurred())
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, cfg.Name)
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, cfg.Name)
Expect(framework.WaitForEndpoint(f.ClientSet, f.Namespace.Name, service.Name)).NotTo(HaveOccurred())

View File

@ -339,7 +339,7 @@ var _ = SIGDescribe("Services", func() {
// Stop service 1 and make sure it is gone.
By("stopping service1")
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "service1"))
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, ns, "service1"))
By("verifying service1 is not up")
framework.ExpectNoError(framework.VerifyServeHostnameServiceDown(cs, host, svc1IP, servicePort))
@ -373,13 +373,13 @@ var _ = SIGDescribe("Services", func() {
svc2 := "service2"
defer func() {
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, svc1))
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, ns, svc1))
}()
podNames1, svc1IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService(svc1), ns, numPods)
Expect(err).NotTo(HaveOccurred())
defer func() {
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, svc2))
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, ns, svc2))
}()
podNames2, svc2IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService(svc2), ns, numPods)
Expect(err).NotTo(HaveOccurred())
@ -426,7 +426,7 @@ var _ = SIGDescribe("Services", func() {
numPods, servicePort := 3, 80
defer func() {
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "service1"))
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, ns, "service1"))
}()
podNames1, svc1IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService("service1"), ns, numPods)
Expect(err).NotTo(HaveOccurred())
@ -453,7 +453,7 @@ var _ = SIGDescribe("Services", func() {
// Create a new service and check if it's not reusing IP.
defer func() {
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "service2"))
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, ns, "service2"))
}()
podNames2, svc2IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService("service2"), ns, numPods)
Expect(err).NotTo(HaveOccurred())
@ -1753,7 +1753,7 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() {
framework.Logf("Health checking %s, http://%s%s, expectedSuccess %v", nodes.Items[n].Name, ipPort, path, expectedSuccess)
Expect(jig.TestHTTPHealthCheckNodePort(publicIP, healthCheckNodePort, path, framework.KubeProxyEndpointLagTimeout, expectedSuccess, threshold)).NotTo(HaveOccurred())
}
framework.ExpectNoError(framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, namespace, serviceName))
framework.ExpectNoError(framework.DeleteRCAndWaitForGC(f.ClientSet, namespace, serviceName))
}
})
@ -1968,7 +1968,7 @@ func execAffinityTestForNonLBService(f *framework.Framework, cs clientset.Interf
_, _, err := framework.StartServeHostnameService(cs, f.InternalClientset, svc, ns, numPods)
Expect(err).NotTo(HaveOccurred())
defer func() {
framework.StopServeHostnameService(cs, f.InternalClientset, f.ScalesGetter, ns, serviceName)
framework.StopServeHostnameService(cs, ns, serviceName)
}()
jig := framework.NewServiceTestJig(cs, serviceName)
svc, err = jig.Client.CoreV1().Services(ns).Get(serviceName, metav1.GetOptions{})
@ -2023,7 +2023,7 @@ func execAffinityTestForLBService(f *framework.Framework, cs clientset.Interface
svc = jig.WaitForLoadBalancerOrFail(ns, serviceName, framework.LoadBalancerCreateTimeoutDefault)
jig.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
defer func() {
framework.StopServeHostnameService(cs, f.InternalClientset, f.ScalesGetter, ns, serviceName)
framework.StopServeHostnameService(cs, ns, serviceName)
lb := cloudprovider.GetLoadBalancerName(svc)
framework.Logf("cleaning gce resource for %s", lb)
framework.CleanupServiceGCEResources(cs, lb, framework.TestContext.CloudConfig.Region, framework.TestContext.CloudConfig.Zone)

View File

@ -334,7 +334,7 @@ var _ = SIGDescribe("kubelet", func() {
}
By("Deleting the RC")
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, rcName)
framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, rcName)
// Check that the pods really are gone by querying /runningpods on the
// node. The /runningpods handler checks the container runtime (or its
// cache) and returns a list of running pods. Some possible causes of

View File

@ -118,7 +118,7 @@ func runResourceTrackingTest(f *framework.Framework, podsPerNode int, nodeNames
verifyCPULimits(expectedCPU, cpuSummary)
By("Deleting the RC")
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, rcName)
framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, rcName)
}
func verifyMemoryLimits(c clientset.Interface, expected framework.ResourceUsagePerContainer, actual framework.ResourceUsagePerNode) {

View File

@ -116,10 +116,8 @@ func (dtc *DensityTestConfig) deleteConfigMaps(testPhase *timer.Phase) {
func (dtc *DensityTestConfig) deleteDaemonSets(numberOfClients int, testPhase *timer.Phase) {
defer testPhase.End()
for i := range dtc.DaemonConfigs {
framework.ExpectNoError(framework.DeleteResourceAndPods(
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(
dtc.ClientSets[i%numberOfClients],
dtc.InternalClientsets[i%numberOfClients],
dtc.ScaleClients[i%numberOfClients],
extensions.Kind("DaemonSet"),
dtc.DaemonConfigs[i].Namespace,
dtc.DaemonConfigs[i].Name,
@ -320,15 +318,9 @@ func cleanupDensityTest(dtc DensityTestConfig, testPhaseDurations *timer.TestPha
name := dtc.Configs[i].GetName()
namespace := dtc.Configs[i].GetNamespace()
kind := dtc.Configs[i].GetKind()
if framework.TestContext.GarbageCollectorEnabled && kindSupportsGarbageCollector(kind) {
By(fmt.Sprintf("Cleaning up only the %v, garbage collector will clean up the pods", kind))
err := framework.DeleteResourceAndWaitForGC(dtc.ClientSets[i%numberOfClients], kind, namespace, name)
framework.ExpectNoError(err)
} else {
By(fmt.Sprintf("Cleaning up the %v and pods", kind))
err := framework.DeleteResourceAndPods(dtc.ClientSets[i%numberOfClients], dtc.InternalClientsets[i%numberOfClients], dtc.ScaleClients[i%numberOfClients], kind, namespace, name)
framework.ExpectNoError(err)
}
By(fmt.Sprintf("Cleaning up only the %v, garbage collector will clean up the pods", kind))
err := framework.DeleteResourceAndWaitForGC(dtc.ClientSets[i%numberOfClients], kind, namespace, name)
framework.ExpectNoError(err)
}
podCleanupPhase.End()
@ -922,7 +914,3 @@ func createRunningPodFromRC(wg *sync.WaitGroup, c clientset.Interface, name, ns,
framework.ExpectNoError(framework.WaitForControlledPodsRunning(c, ns, name, api.Kind("ReplicationController")))
framework.Logf("Found pod '%s' running", name)
}
func kindSupportsGarbageCollector(kind schema.GroupKind) bool {
return kind != extensions.Kind("Deployment") && kind != batch.Kind("Job")
}

View File

@ -286,10 +286,8 @@ var _ = SIGDescribe("Load capacity", func() {
}
daemonConfig.Run()
defer func(config *testutils.DaemonConfig) {
framework.ExpectNoError(framework.DeleteResourceAndPods(
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(
f.ClientSet,
f.InternalClientset,
f.ScalesGetter,
extensions.Kind("DaemonSet"),
config.Namespace,
config.Name,
@ -694,15 +692,9 @@ func deleteResource(wg *sync.WaitGroup, config testutils.RunObjectConfig, deleti
defer wg.Done()
sleepUpTo(deletingTime)
if framework.TestContext.GarbageCollectorEnabled && config.GetKind() != extensions.Kind("Deployment") {
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(
config.GetClient(), config.GetKind(), config.GetNamespace(), config.GetName()),
fmt.Sprintf("deleting %v %s", config.GetKind(), config.GetName()))
} else {
framework.ExpectNoError(framework.DeleteResourceAndPods(
config.GetClient(), config.GetInternalClient(), config.GetScalesGetter(), config.GetKind(), config.GetNamespace(), config.GetName()),
fmt.Sprintf("deleting %v %s", config.GetKind(), config.GetName()))
}
framework.ExpectNoError(framework.DeleteResourceAndWaitForGC(
config.GetClient(), config.GetKind(), config.GetNamespace(), config.GetName()),
fmt.Sprintf("deleting %v %s", config.GetKind(), config.GetName()))
}
func CreateNamespaces(f *framework.Framework, namespaceCount int, namePrefix string, testPhase *timer.Phase) ([]*v1.Namespace, error) {

View File

@ -93,7 +93,7 @@ var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() {
err := CreateNodeSelectorPods(f, rcName, 2, nodeSelector, false)
return err
}, ns, rcName, false)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName)
defer framework.DeleteRCAndWaitForGC(f.ClientSet, ns, rcName)
// the first replica pod is scheduled, and the second pod will be rejected.
verifyResult(cs, 1, 1, ns)
})
@ -141,7 +141,7 @@ var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() {
},
}
rc := getRCWithInterPodAffinity(affinityRCName, labelsMap, replica, affinity, imageutils.GetPauseImageName())
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, affinityRCName)
defer framework.DeleteRCAndWaitForGC(f.ClientSet, ns, affinityRCName)
// RC should be running successfully
// TODO: WaitForSchedulerAfterAction() can on be used to wait for failure event,
@ -167,7 +167,7 @@ var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() {
It("validates pod anti-affinity works properly when new replica pod is scheduled", func() {
By("Launching two pods on two distinct nodes to get two node names")
CreateHostPortPods(f, "host-port", 2, true)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "host-port")
defer framework.DeleteRCAndWaitForGC(f.ClientSet, ns, "host-port")
podList, err := cs.CoreV1().Pods(ns).List(metav1.ListOptions{})
framework.ExpectNoError(err)
Expect(len(podList.Items)).To(Equal(2))
@ -218,7 +218,7 @@ var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() {
}
rc := getRCWithInterPodAffinityNodeSelector(labelRCName, labelsMap, replica, affinity,
imageutils.GetPauseImageName(), map[string]string{k: v})
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, labelRCName)
defer framework.DeleteRCAndWaitForGC(f.ClientSet, ns, labelRCName)
WaitForSchedulerAfterAction(f, func() error {
_, err := cs.CoreV1().ReplicationControllers(ns).Create(rc)

View File

@ -71,7 +71,7 @@ var _ = SIGDescribe("SchedulerPredicates [Serial]", func() {
rc, err := cs.CoreV1().ReplicationControllers(ns).Get(RCName, metav1.GetOptions{})
if err == nil && *(rc.Spec.Replicas) != 0 {
By("Cleaning up the replication controller")
err := framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, RCName)
err := framework.DeleteRCAndWaitForGC(f.ClientSet, ns, RCName)
framework.ExpectNoError(err)
}
})

View File

@ -153,7 +153,7 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() {
// Cleanup the replication controller when we are done.
defer func() {
// Resize the replication controller to zero to get rid of pods.
if err := framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, rc.Name); err != nil {
if err := framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, rc.Name); err != nil {
framework.Logf("Failed to cleanup replication controller %v: %v.", rc.Name, err)
}
}()

View File

@ -56,7 +56,7 @@ var _ = SIGDescribe("Rescheduler [Serial]", func() {
It("should ensure that critical pod is scheduled in case there is no resources available", func() {
By("reserving all available cpu")
err := reserveAllCpu(f, "reserve-all-cpu", totalMillicores)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "reserve-all-cpu")
defer framework.DeleteRCAndWaitForGC(f.ClientSet, ns, "reserve-all-cpu")
framework.ExpectNoError(err)
By("creating a new instance of Dashboard and waiting for Dashboard to be scheduled")

View File

@ -224,7 +224,7 @@ func SpreadRCOrFail(f *framework.Framework, replicaCount int32, image string) {
// Cleanup the replication controller when we are done.
defer func() {
// Resize the replication controller to zero to get rid of pods.
if err := framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, controller.Name); err != nil {
if err := framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, controller.Name); err != nil {
framework.Logf("Failed to cleanup replication controller %v: %v.", controller.Name, err)
}
}()

View File

@ -374,7 +374,7 @@ func testNoWrappedVolumeRace(f *framework.Framework, volumes []v1.Volume, volume
Expect(err).NotTo(HaveOccurred(), "error creating replication controller")
defer func() {
err := framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, rcName)
err := framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, rcName)
framework.ExpectNoError(err)
}()

View File

@ -25,13 +25,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
clientset "k8s.io/client-go/kubernetes"
scaleclient "k8s.io/client-go/scale"
appsinternal "k8s.io/kubernetes/pkg/apis/apps"
batchinternal "k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/kubectl"
)
func deleteResource(c clientset.Interface, kind schema.GroupKind, namespace, name string, options *metav1.DeleteOptions) error {
@ -72,21 +69,3 @@ func DeleteResourceWithRetries(c clientset.Interface, kind schema.GroupKind, nam
}
return RetryWithExponentialBackOff(deleteFunc)
}
func DeleteResourceUsingReaperWithRetries(c internalclientset.Interface, kind schema.GroupKind, namespace, name string, options *metav1.DeleteOptions, scaleClient scaleclient.ScalesGetter) error {
reaper, err := kubectl.ReaperFor(kind, c, scaleClient)
if err != nil {
return err
}
deleteFunc := func() (bool, error) {
err := reaper.Stop(namespace, name, 0, options)
if err == nil || apierrs.IsNotFound(err) {
return true, nil
}
if IsRetryableAPIError(err) {
return false, nil
}
return false, fmt.Errorf("Failed to delete object with non-retriable error: %v", err)
}
return RetryWithExponentialBackOff(deleteFunc)
}