mirror of https://github.com/k3s-io/k3s
the garbage collector controller
parent
aada051b20
commit
c73406bcfe
|
@ -298,7 +298,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
|||
// Find the list of namespaced resources via discovery that the namespace controller must manage
|
||||
namespaceKubeClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller"))
|
||||
namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), dynamic.LegacyAPIPathResolverFunc)
|
||||
groupVersionResources, err := namespacecontroller.ServerPreferredNamespacedGroupVersionResources(namespaceKubeClient.Discovery())
|
||||
groupVersionResources, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources()
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to get supported resources from server: %v", err)
|
||||
}
|
||||
|
|
|
@ -220,7 +220,7 @@ func (s *CMServer) Run(_ []string) error {
|
|||
// Find the list of namespaced resources via discovery that the namespace controller must manage
|
||||
namespaceKubeClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller"))
|
||||
namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), dynamic.LegacyAPIPathResolverFunc)
|
||||
groupVersionResources, err := namespacecontroller.ServerPreferredNamespacedGroupVersionResources(namespaceKubeClient.Discovery())
|
||||
groupVersionResources, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources()
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to get supported resources from server: %v", err)
|
||||
}
|
||||
|
|
|
@ -32,7 +32,9 @@ source "${KUBE_ROOT}/hack/lib/init.sh"
|
|||
KUBE_TEST_API_VERSIONS=${KUBE_TEST_API_VERSIONS:-"v1,extensions/v1beta1;v1,autoscaling/v1,batch/v1,apps/v1alpha1,policy/v1alpha1,extensions/v1beta1"}
|
||||
|
||||
# Give integration tests longer to run
|
||||
KUBE_TIMEOUT=${KUBE_TIMEOUT:--timeout 240s}
|
||||
# TODO: allow a larger value to be passed in
|
||||
#KUBE_TIMEOUT=${KUBE_TIMEOUT:--timeout 240s}
|
||||
KUBE_TIMEOUT="-timeout 1200s"
|
||||
KUBE_INTEGRATION_TEST_MAX_CONCURRENCY=${KUBE_INTEGRATION_TEST_MAX_CONCURRENCY:-"-1"}
|
||||
LOG_LEVEL=${LOG_LEVEL:-2}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package api
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/api/meta/metatypes"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/conversion"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
|
@ -89,3 +90,25 @@ func (meta *ObjectMeta) GetLabels() map[string]string { return m
|
|||
func (meta *ObjectMeta) SetLabels(labels map[string]string) { meta.Labels = labels }
|
||||
func (meta *ObjectMeta) GetAnnotations() map[string]string { return meta.Annotations }
|
||||
func (meta *ObjectMeta) SetAnnotations(annotations map[string]string) { meta.Annotations = annotations }
|
||||
|
||||
func (meta *ObjectMeta) GetOwnerReferences() []metatypes.OwnerReference {
|
||||
ret := make([]metatypes.OwnerReference, len(meta.OwnerReferences))
|
||||
for i := 0; i < len(meta.OwnerReferences); i++ {
|
||||
ret[i].Kind = meta.OwnerReferences[i].Kind
|
||||
ret[i].Name = meta.OwnerReferences[i].Name
|
||||
ret[i].UID = meta.OwnerReferences[i].UID
|
||||
ret[i].APIVersion = meta.OwnerReferences[i].APIVersion
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (meta *ObjectMeta) SetOwnerReferences(references []metatypes.OwnerReference) {
|
||||
newReferences := make([]OwnerReference, len(references))
|
||||
for i := 0; i < len(references); i++ {
|
||||
newReferences[i].Kind = references[i].Kind
|
||||
newReferences[i].Name = references[i].Name
|
||||
newReferences[i].UID = references[i].UID
|
||||
newReferences[i].APIVersion = references[i].APIVersion
|
||||
}
|
||||
meta.OwnerReferences = newReferences
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package meta
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/api/meta/metatypes"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
|
@ -57,6 +58,14 @@ type Object interface {
|
|||
SetLabels(labels map[string]string)
|
||||
GetAnnotations() map[string]string
|
||||
SetAnnotations(annotations map[string]string)
|
||||
GetOwnerReferences() []metatypes.OwnerReference
|
||||
SetOwnerReferences([]metatypes.OwnerReference)
|
||||
}
|
||||
|
||||
var _ Object = &runtime.Unstructured{}
|
||||
|
||||
type ListMetaAccessor interface {
|
||||
GetListMeta() List
|
||||
}
|
||||
|
||||
// List lets you work with list metadata from any of the versioned or
|
||||
|
@ -177,5 +186,3 @@ type RESTMapper interface {
|
|||
AliasesForResource(resource string) ([]string, bool)
|
||||
ResourceSingularizer(resource string) (singular string, err error)
|
||||
}
|
||||
|
||||
var _ Object = &runtime.Unstructured{}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/meta/metatypes"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/conversion"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
|
@ -28,19 +29,53 @@ import (
|
|||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
func ListAccessor(obj interface{}) (List, error) {
|
||||
if listMetaAccessor, ok := obj.(ListMetaAccessor); ok {
|
||||
if om := listMetaAccessor.GetListMeta(); om != nil {
|
||||
return om, nil
|
||||
}
|
||||
}
|
||||
// we may get passed an object that is directly portable to List
|
||||
if list, ok := obj.(List); ok {
|
||||
return list, nil
|
||||
}
|
||||
glog.V(4).Infof("Calling ListAccessor on non-internal object: %v", reflect.TypeOf(obj))
|
||||
// legacy path for objects that do not implement List and ListMetaAccessor via
|
||||
// reflection - very slow code path.
|
||||
v, err := conversion.EnforcePtr(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t := v.Type()
|
||||
if v.Kind() != reflect.Struct {
|
||||
return nil, fmt.Errorf("expected struct, but got %v: %v (%#v)", v.Kind(), t, v.Interface())
|
||||
}
|
||||
a := &genericAccessor{}
|
||||
listMeta := v.FieldByName("ListMeta")
|
||||
if listMeta.IsValid() {
|
||||
// look for the ListMeta fields
|
||||
if err := extractFromListMeta(listMeta, a); err != nil {
|
||||
return nil, fmt.Errorf("unable to find list fields on %#v: %v", listMeta, err)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("unable to find listMeta on %#v", v)
|
||||
}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
// Accessor takes an arbitrary object pointer and returns meta.Interface.
|
||||
// obj must be a pointer to an API type. An error is returned if the minimum
|
||||
// required fields are missing. Fields that are not required return the default
|
||||
// value and are a no-op if set.
|
||||
func Accessor(obj interface{}) (Object, error) {
|
||||
if oi, ok := obj.(ObjectMetaAccessor); ok {
|
||||
if om := oi.GetObjectMeta(); om != nil {
|
||||
if objectMetaAccessor, ok := obj.(ObjectMetaAccessor); ok {
|
||||
if om := objectMetaAccessor.GetObjectMeta(); om != nil {
|
||||
return om, nil
|
||||
}
|
||||
}
|
||||
// we may get passed an object that is directly portable to Object
|
||||
if oi, ok := obj.(Object); ok {
|
||||
return oi, nil
|
||||
if object, ok := obj.(Object); ok {
|
||||
return object, nil
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Calling Accessor on non-internal object: %v", reflect.TypeOf(obj))
|
||||
|
@ -310,6 +345,40 @@ func (resourceAccessor) SetResourceVersion(obj runtime.Object, version string) e
|
|||
return nil
|
||||
}
|
||||
|
||||
// extractFromOwnerReference extracts v to o. v is the OwnerReferences field of an object.
|
||||
func extractFromOwnerReference(v reflect.Value, o *metatypes.OwnerReference) error {
|
||||
if err := runtime.Field(v, "APIVersion", &o.APIVersion); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := runtime.Field(v, "Kind", &o.Kind); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := runtime.Field(v, "Name", &o.Name); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := runtime.Field(v, "UID", &o.UID); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// setOwnerReference sets v to o. v is the OwnerReferences field of an object.
|
||||
func setOwnerReference(v reflect.Value, o *metatypes.OwnerReference) error {
|
||||
if err := runtime.SetField(o.APIVersion, v, "APIVersion"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := runtime.SetField(o.Kind, v, "Kind"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := runtime.SetField(o.Name, v, "Name"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := runtime.SetField(o.UID, v, "UID"); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// genericAccessor contains pointers to strings that can modify an arbitrary
|
||||
// struct and implements the Accessor interface.
|
||||
type genericAccessor struct {
|
||||
|
@ -325,6 +394,7 @@ type genericAccessor struct {
|
|||
deletionTimestamp **unversioned.Time
|
||||
labels *map[string]string
|
||||
annotations *map[string]string
|
||||
ownerReferences reflect.Value
|
||||
}
|
||||
|
||||
func (a genericAccessor) GetNamespace() string {
|
||||
|
@ -457,6 +527,41 @@ func (a genericAccessor) SetAnnotations(annotations map[string]string) {
|
|||
*a.annotations = annotations
|
||||
}
|
||||
|
||||
func (a genericAccessor) GetOwnerReferences() []metatypes.OwnerReference {
|
||||
var ret []metatypes.OwnerReference
|
||||
s := a.ownerReferences
|
||||
if s.Kind() != reflect.Ptr || s.Elem().Kind() != reflect.Slice {
|
||||
glog.Errorf("expect %v to be a pointer to slice", s)
|
||||
return ret
|
||||
}
|
||||
s = s.Elem()
|
||||
// Set the capacity to one element greater to avoid copy if the caller later append an element.
|
||||
ret = make([]metatypes.OwnerReference, s.Len(), s.Len()+1)
|
||||
for i := 0; i < s.Len(); i++ {
|
||||
if err := extractFromOwnerReference(s.Index(i), &ret[i]); err != nil {
|
||||
glog.Errorf("extractFromOwnerReference failed: %v", err)
|
||||
return ret
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (a genericAccessor) SetOwnerReferences(references []metatypes.OwnerReference) {
|
||||
s := a.ownerReferences
|
||||
if s.Kind() != reflect.Ptr || s.Elem().Kind() != reflect.Slice {
|
||||
glog.Errorf("expect %v to be a pointer to slice", s)
|
||||
}
|
||||
s = s.Elem()
|
||||
newReferences := reflect.MakeSlice(s.Type(), len(references), len(references))
|
||||
for i := 0; i < len(references); i++ {
|
||||
if err := setOwnerReference(newReferences.Index(i), &references[i]); err != nil {
|
||||
glog.Errorf("setOwnerReference failed: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
s.Set(newReferences)
|
||||
}
|
||||
|
||||
// extractFromTypeMeta extracts pointers to version and kind fields from an object
|
||||
func extractFromTypeMeta(v reflect.Value, a *genericAccessor) error {
|
||||
if err := runtime.FieldPtr(v, "APIVersion", &a.apiVersion); err != nil {
|
||||
|
@ -494,6 +599,14 @@ func extractFromObjectMeta(v reflect.Value, a *genericAccessor) error {
|
|||
if err := runtime.FieldPtr(v, "Annotations", &a.annotations); err != nil {
|
||||
return err
|
||||
}
|
||||
ownerReferences := v.FieldByName("OwnerReferences")
|
||||
if !ownerReferences.IsValid() {
|
||||
return fmt.Errorf("struct %#v lacks OwnerReferences type", v)
|
||||
}
|
||||
if ownerReferences.Kind() != reflect.Slice {
|
||||
return fmt.Errorf("expect %v to be a slice", ownerReferences.Kind())
|
||||
}
|
||||
a.ownerReferences = ownerReferences.Addr()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -20,9 +20,13 @@ import (
|
|||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/google/gofuzz"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/api/meta/metatypes"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
)
|
||||
|
@ -127,17 +131,18 @@ func TestAPIObjectMeta(t *testing.T) {
|
|||
|
||||
func TestGenericTypeMeta(t *testing.T) {
|
||||
type TypeMeta struct {
|
||||
Kind string `json:"kind,omitempty"`
|
||||
Namespace string `json:"namespace,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
GenerateName string `json:"generateName,omitempty"`
|
||||
UID string `json:"uid,omitempty"`
|
||||
CreationTimestamp unversioned.Time `json:"creationTimestamp,omitempty"`
|
||||
SelfLink string `json:"selfLink,omitempty"`
|
||||
ResourceVersion string `json:"resourceVersion,omitempty"`
|
||||
APIVersion string `json:"apiVersion,omitempty"`
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
Annotations map[string]string `json:"annotations,omitempty"`
|
||||
Kind string `json:"kind,omitempty"`
|
||||
Namespace string `json:"namespace,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
GenerateName string `json:"generateName,omitempty"`
|
||||
UID string `json:"uid,omitempty"`
|
||||
CreationTimestamp unversioned.Time `json:"creationTimestamp,omitempty"`
|
||||
SelfLink string `json:"selfLink,omitempty"`
|
||||
ResourceVersion string `json:"resourceVersion,omitempty"`
|
||||
APIVersion string `json:"apiVersion,omitempty"`
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
Annotations map[string]string `json:"annotations,omitempty"`
|
||||
OwnerReferences []api.OwnerReference `json:"ownerReferences,omitempty"`
|
||||
}
|
||||
type Object struct {
|
||||
TypeMeta `json:",inline"`
|
||||
|
@ -236,18 +241,20 @@ func TestGenericTypeMeta(t *testing.T) {
|
|||
}
|
||||
|
||||
type InternalTypeMeta struct {
|
||||
Kind string `json:"kind,omitempty"`
|
||||
Namespace string `json:"namespace,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
GenerateName string `json:"generateName,omitempty"`
|
||||
UID string `json:"uid,omitempty"`
|
||||
CreationTimestamp unversioned.Time `json:"creationTimestamp,omitempty"`
|
||||
SelfLink string `json:"selfLink,omitempty"`
|
||||
ResourceVersion string `json:"resourceVersion,omitempty"`
|
||||
APIVersion string `json:"apiVersion,omitempty"`
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
Annotations map[string]string `json:"annotations,omitempty"`
|
||||
Kind string `json:"kind,omitempty"`
|
||||
Namespace string `json:"namespace,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
GenerateName string `json:"generateName,omitempty"`
|
||||
UID string `json:"uid,omitempty"`
|
||||
CreationTimestamp unversioned.Time `json:"creationTimestamp,omitempty"`
|
||||
SelfLink string `json:"selfLink,omitempty"`
|
||||
ResourceVersion string `json:"resourceVersion,omitempty"`
|
||||
APIVersion string `json:"apiVersion,omitempty"`
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
Annotations map[string]string `json:"annotations,omitempty"`
|
||||
OwnerReferences []api.OwnerReference `json:"ownerReferences,omitempty"`
|
||||
}
|
||||
|
||||
type InternalObject struct {
|
||||
TypeMeta InternalTypeMeta `json:",inline"`
|
||||
}
|
||||
|
@ -273,6 +280,7 @@ func TestGenericTypeMetaAccessor(t *testing.T) {
|
|||
SelfLink: "some/place/only/we/know",
|
||||
Labels: map[string]string{"foo": "bar"},
|
||||
Annotations: map[string]string{"x": "y"},
|
||||
// OwnerReferences are tested separately
|
||||
},
|
||||
}
|
||||
accessor := meta.NewAccessor()
|
||||
|
@ -418,15 +426,16 @@ func TestGenericObjectMeta(t *testing.T) {
|
|||
APIVersion string `json:"apiVersion,omitempty"`
|
||||
}
|
||||
type ObjectMeta struct {
|
||||
Namespace string `json:"namespace,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
GenerateName string `json:"generateName,omitempty"`
|
||||
UID string `json:"uid,omitempty"`
|
||||
CreationTimestamp unversioned.Time `json:"creationTimestamp,omitempty"`
|
||||
SelfLink string `json:"selfLink,omitempty"`
|
||||
ResourceVersion string `json:"resourceVersion,omitempty"`
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
Annotations map[string]string `json:"annotations,omitempty"`
|
||||
Namespace string `json:"namespace,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
GenerateName string `json:"generateName,omitempty"`
|
||||
UID string `json:"uid,omitempty"`
|
||||
CreationTimestamp unversioned.Time `json:"creationTimestamp,omitempty"`
|
||||
SelfLink string `json:"selfLink,omitempty"`
|
||||
ResourceVersion string `json:"resourceVersion,omitempty"`
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
Annotations map[string]string `json:"annotations,omitempty"`
|
||||
OwnerReferences []api.OwnerReference `json:"ownerReferences,omitempty"`
|
||||
}
|
||||
type Object struct {
|
||||
TypeMeta `json:",inline"`
|
||||
|
@ -722,6 +731,66 @@ func TestTypeMetaSelfLinker(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type MyAPIObject2 struct {
|
||||
unversioned.TypeMeta
|
||||
v1.ObjectMeta
|
||||
}
|
||||
|
||||
func getObjectMetaAndOwnerRefereneces() (myAPIObject2 MyAPIObject2, metaOwnerReferences []metatypes.OwnerReference) {
|
||||
fuzz.New().NilChance(.5).NumElements(1, 5).Fuzz(&myAPIObject2)
|
||||
references := myAPIObject2.ObjectMeta.OwnerReferences
|
||||
// This is necessary for the test to pass because the getter will return a
|
||||
// non-nil slice.
|
||||
metaOwnerReferences = make([]metatypes.OwnerReference, 0)
|
||||
for i := 0; i < len(references); i++ {
|
||||
metaOwnerReferences = append(metaOwnerReferences, metatypes.OwnerReference{
|
||||
Kind: references[i].Kind,
|
||||
Name: references[i].Name,
|
||||
UID: references[i].UID,
|
||||
APIVersion: references[i].APIVersion,
|
||||
})
|
||||
}
|
||||
if len(references) == 0 {
|
||||
// This is necessary for the test to pass because the setter will make a
|
||||
// non-nil slice.
|
||||
myAPIObject2.ObjectMeta.OwnerReferences = make([]v1.OwnerReference, 0)
|
||||
}
|
||||
return myAPIObject2, metaOwnerReferences
|
||||
}
|
||||
|
||||
func testGetOwnerReferences(t *testing.T) {
|
||||
obj, expected := getObjectMetaAndOwnerRefereneces()
|
||||
accessor, err := meta.Accessor(&obj)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
references := accessor.GetOwnerReferences()
|
||||
if !reflect.DeepEqual(references, expected) {
|
||||
t.Errorf("expect %#v\n got %#v", expected, references)
|
||||
}
|
||||
}
|
||||
|
||||
func testSetOwnerReferences(t *testing.T) {
|
||||
expected, references := getObjectMetaAndOwnerRefereneces()
|
||||
obj := MyAPIObject2{}
|
||||
accessor, err := meta.Accessor(&obj)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
accessor.SetOwnerReferences(references)
|
||||
if e, a := expected.ObjectMeta.OwnerReferences, obj.ObjectMeta.OwnerReferences; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("expect %#v\n got %#v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccessOwnerReferences(t *testing.T) {
|
||||
fuzzIter := 5
|
||||
for i := 0; i < fuzzIter; i++ {
|
||||
testGetOwnerReferences(t)
|
||||
testSetOwnerReferences(t)
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkAccessorSetFastPath shows the interface fast path
|
||||
func BenchmarkAccessorSetFastPath(b *testing.B) {
|
||||
obj := &api.Pod{
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
// +build !ignore_autogenerated
|
||||
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// This file was autogenerated by deepcopy-gen. Do not edit it manually!
|
||||
|
||||
package metatypes
|
||||
|
||||
import (
|
||||
conversion "k8s.io/kubernetes/pkg/conversion"
|
||||
)
|
||||
|
||||
func DeepCopy_metatypes_OwnerReference(in OwnerReference, out *OwnerReference, c *conversion.Cloner) error {
|
||||
out.APIVersion = in.APIVersion
|
||||
out.Kind = in.Kind
|
||||
out.UID = in.UID
|
||||
out.Name = in.Name
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// The types defined in this package are used by the meta package to represent
|
||||
// the in-memory version of objects. We cannot reuse the __internal version of
|
||||
// API objects because it causes import cycle.
|
||||
package metatypes
|
||||
|
||||
import "k8s.io/kubernetes/pkg/types"
|
||||
|
||||
type OwnerReference struct {
|
||||
APIVersion string
|
||||
Kind string
|
||||
UID types.UID
|
||||
Name string
|
||||
}
|
|
@ -17,10 +17,14 @@ limitations under the License.
|
|||
package api_test
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/google/gofuzz"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/api/meta/metatypes"
|
||||
)
|
||||
|
||||
var _ meta.Object = &api.ObjectMeta{}
|
||||
|
@ -49,3 +53,46 @@ func TestHasObjectMetaSystemFieldValues(t *testing.T) {
|
|||
t.Errorf("the resource does have all fields populated, but incorrectly reports it does not")
|
||||
}
|
||||
}
|
||||
|
||||
func getObjectMetaAndOwnerReferences() (objectMeta api.ObjectMeta, metaOwnerReferences []metatypes.OwnerReference) {
|
||||
fuzz.New().NilChance(.5).NumElements(1, 5).Fuzz(&objectMeta)
|
||||
references := objectMeta.OwnerReferences
|
||||
metaOwnerReferences = make([]metatypes.OwnerReference, 0)
|
||||
for i := 0; i < len(references); i++ {
|
||||
metaOwnerReferences = append(metaOwnerReferences, metatypes.OwnerReference{
|
||||
Kind: references[i].Kind,
|
||||
Name: references[i].Name,
|
||||
UID: references[i].UID,
|
||||
APIVersion: references[i].APIVersion,
|
||||
})
|
||||
}
|
||||
if len(references) == 0 {
|
||||
objectMeta.OwnerReferences = make([]api.OwnerReference, 0)
|
||||
}
|
||||
return objectMeta, metaOwnerReferences
|
||||
}
|
||||
|
||||
func testGetOwnerReferences(t *testing.T) {
|
||||
meta, expected := getObjectMetaAndOwnerReferences()
|
||||
refs := meta.GetOwnerReferences()
|
||||
if !reflect.DeepEqual(refs, expected) {
|
||||
t.Errorf("expect %v\n got %v", expected, refs)
|
||||
}
|
||||
}
|
||||
|
||||
func testSetOwnerReferences(t *testing.T) {
|
||||
expected, newRefs := getObjectMetaAndOwnerReferences()
|
||||
objectMeta := &api.ObjectMeta{}
|
||||
objectMeta.SetOwnerReferences(newRefs)
|
||||
if !reflect.DeepEqual(expected.OwnerReferences, objectMeta.OwnerReferences) {
|
||||
t.Errorf("expect: %#v\n got: %#v", expected.OwnerReferences, objectMeta.OwnerReferences)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccessOwnerReferences(t *testing.T) {
|
||||
fuzzIter := 5
|
||||
for i := 0; i < fuzzIter; i++ {
|
||||
testGetOwnerReferences(t)
|
||||
testSetOwnerReferences(t)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -292,11 +292,11 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
|
||||
}
|
||||
metaInterface, err := meta.Accessor(list)
|
||||
listMetaInterface, err := meta.ListAccessor(list)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: Unable to understand list result %#v", r.name, list)
|
||||
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
|
||||
}
|
||||
resourceVersion = metaInterface.GetResourceVersion()
|
||||
resourceVersion = listMetaInterface.GetResourceVersion()
|
||||
items, err := meta.ExtractList(list)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/emicklei/go-restful/swagger"
|
||||
|
||||
|
@ -30,6 +31,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/runtime/serializer"
|
||||
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
||||
"k8s.io/kubernetes/pkg/version"
|
||||
)
|
||||
|
||||
|
@ -55,6 +57,12 @@ type ServerResourcesInterface interface {
|
|||
ServerResourcesForGroupVersion(groupVersion string) (*unversioned.APIResourceList, error)
|
||||
// ServerResources returns the supported resources for all groups and versions.
|
||||
ServerResources() (map[string]*unversioned.APIResourceList, error)
|
||||
// ServerPreferredResources returns the supported resources with the version preferred by the
|
||||
// server.
|
||||
ServerPreferredResources() ([]unversioned.GroupVersionResource, error)
|
||||
// ServerPreferredNamespacedResources returns the supported namespaced resources with the
|
||||
// version preferred by the server.
|
||||
ServerPreferredNamespacedResources() ([]unversioned.GroupVersionResource, error)
|
||||
}
|
||||
|
||||
// ServerVersionInterface has a method for retrieving the server's version.
|
||||
|
@ -163,6 +171,50 @@ func (d *DiscoveryClient) ServerResources() (map[string]*unversioned.APIResource
|
|||
return result, nil
|
||||
}
|
||||
|
||||
// serverPreferredResources returns the supported resources with the version preferred by the
|
||||
// server. If namespaced is true, only namespaced resources will be returned.
|
||||
func (d *DiscoveryClient) serverPreferredResources(namespaced bool) ([]unversioned.GroupVersionResource, error) {
|
||||
results := []unversioned.GroupVersionResource{}
|
||||
serverGroupList, err := d.ServerGroups()
|
||||
if err != nil {
|
||||
return results, err
|
||||
}
|
||||
|
||||
allErrs := []error{}
|
||||
for _, apiGroup := range serverGroupList.Groups {
|
||||
preferredVersion := apiGroup.PreferredVersion
|
||||
apiResourceList, err := d.ServerResourcesForGroupVersion(preferredVersion.GroupVersion)
|
||||
if err != nil {
|
||||
allErrs = append(allErrs, err)
|
||||
continue
|
||||
}
|
||||
groupVersion := unversioned.GroupVersion{Group: apiGroup.Name, Version: preferredVersion.Version}
|
||||
for _, apiResource := range apiResourceList.APIResources {
|
||||
// ignore the root scoped resources if "namespaced" is true.
|
||||
if namespaced && !apiResource.Namespaced {
|
||||
continue
|
||||
}
|
||||
if strings.Contains(apiResource.Name, "/") {
|
||||
continue
|
||||
}
|
||||
results = append(results, groupVersion.WithResource(apiResource.Name))
|
||||
}
|
||||
}
|
||||
return results, utilerrors.NewAggregate(allErrs)
|
||||
}
|
||||
|
||||
// ServerPreferredResources returns the supported resources with the version preferred by the
|
||||
// server.
|
||||
func (d *DiscoveryClient) ServerPreferredResources() ([]unversioned.GroupVersionResource, error) {
|
||||
return d.serverPreferredResources(false)
|
||||
}
|
||||
|
||||
// ServerPreferredNamespacedResources returns the supported namespaced resources with the
|
||||
// version preferred by the server.
|
||||
func (d *DiscoveryClient) ServerPreferredNamespacedResources() ([]unversioned.GroupVersionResource, error) {
|
||||
return d.serverPreferredResources(true)
|
||||
}
|
||||
|
||||
// ServerVersion retrieves and parses the server's version (git version).
|
||||
func (d *DiscoveryClient) ServerVersion() (*version.Info, error) {
|
||||
body, err := d.Get().AbsPath("/version").Do().Raw()
|
||||
|
|
|
@ -46,6 +46,14 @@ func (c *FakeDiscovery) ServerResources() (map[string]*unversioned.APIResourceLi
|
|||
return c.Resources, nil
|
||||
}
|
||||
|
||||
func (c *FakeDiscovery) ServerPreferredResources() ([]unversioned.GroupVersionResource, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *FakeDiscovery) ServerPreferredNamespacedResources() ([]unversioned.GroupVersionResource, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *FakeDiscovery) ServerGroups() (*unversioned.APIGroupList, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
|
|
@ -382,6 +382,14 @@ type FakeDiscovery struct {
|
|||
*Fake
|
||||
}
|
||||
|
||||
func (c *FakeDiscovery) ServerPreferredResources() ([]unversioned.GroupVersionResource, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *FakeDiscovery) ServerPreferredNamespacedResources() ([]unversioned.GroupVersionResource, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *FakeDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*unversioned.APIResourceList, error) {
|
||||
action := ActionImpl{
|
||||
Verb: "get",
|
||||
|
|
|
@ -0,0 +1,506 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package garbagecollector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/api/meta/metatypes"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/client/typed/dynamic"
|
||||
"k8s.io/kubernetes/pkg/controller/framework"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
const ResourceResyncTime = 60 * time.Second
|
||||
|
||||
type monitor struct {
|
||||
store cache.Store
|
||||
controller *framework.Controller
|
||||
}
|
||||
|
||||
type objectReference struct {
|
||||
metatypes.OwnerReference
|
||||
// This is needed by the dynamic client
|
||||
Namespace string
|
||||
}
|
||||
|
||||
func (s objectReference) String() string {
|
||||
return fmt.Sprintf("[%s/%s, namespace: %s, name: %s, uid: %s]", s.APIVersion, s.Kind, s.Namespace, s.Name, s.UID)
|
||||
}
|
||||
|
||||
// node does not require a lock to protect. The single-threaded
|
||||
// Propagator.processEvent() is the sole writer of the nodes. The multi-threaded
|
||||
// GarbageCollector.processItem() reads the nodes, but it only reads the fields
|
||||
// that never get changed by Propagator.processEvent().
|
||||
type node struct {
|
||||
identity objectReference
|
||||
dependents map[*node]struct{}
|
||||
// When processing an Update event, we need to compare the updated
|
||||
// ownerReferences with the owners recorded in the graph.
|
||||
owners []metatypes.OwnerReference
|
||||
}
|
||||
|
||||
type eventType int
|
||||
|
||||
const (
|
||||
addEvent eventType = iota
|
||||
updateEvent
|
||||
deleteEvent
|
||||
)
|
||||
|
||||
type event struct {
|
||||
eventType eventType
|
||||
obj interface{}
|
||||
// the update event comes with an old object, but it's not used by the garbage collector.
|
||||
oldObj interface{}
|
||||
}
|
||||
|
||||
type Propagator struct {
|
||||
eventQueue *workqueue.Type
|
||||
// uidToNode doesn't require a lock to protect, because only the
|
||||
// single-threaded Propagator.processEvent() reads/writes it.
|
||||
uidToNode map[types.UID]*node
|
||||
gc *GarbageCollector
|
||||
}
|
||||
|
||||
// addDependentToOwners adds n to owners' dependents list. If the owner does not
|
||||
// exist in the p.uidToNode yet, a "virtual" node will be created to represent
|
||||
// the owner. The "virtual" node will be enqueued to the dirtyQueue, so that
|
||||
// processItem() will verify if the owner exists according to the API server.
|
||||
func (p *Propagator) addDependentToOwners(n *node, owners []metatypes.OwnerReference) {
|
||||
for _, owner := range owners {
|
||||
ownerNode, ok := p.uidToNode[owner.UID]
|
||||
if !ok {
|
||||
// Create a "virtual" node in the graph for the owner if it doesn't
|
||||
// exist in the graph yet. Then enqueue the virtual node into the
|
||||
// dirtyQueue. The garbage processor will enqueue a virtual delete
|
||||
// event to delete it from the graph if API server confirms this
|
||||
// owner doesn't exist.
|
||||
ownerNode = &node{
|
||||
identity: objectReference{
|
||||
OwnerReference: owner,
|
||||
Namespace: n.identity.Namespace,
|
||||
},
|
||||
dependents: make(map[*node]struct{}),
|
||||
}
|
||||
p.uidToNode[ownerNode.identity.UID] = ownerNode
|
||||
p.gc.dirtyQueue.Add(ownerNode)
|
||||
}
|
||||
ownerNode.dependents[n] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// insertNode insert the node to p.uidToNode; then it finds all owners as listed
|
||||
// in n.owners, and adds the node to their dependents list.
|
||||
func (p *Propagator) insertNode(n *node) {
|
||||
p.uidToNode[n.identity.UID] = n
|
||||
p.addDependentToOwners(n, n.owners)
|
||||
}
|
||||
|
||||
// removeDependentFromOwners remove n from owners' dependents list.
|
||||
func (p *Propagator) removeDependentFromOwners(n *node, owners []metatypes.OwnerReference) {
|
||||
for _, owner := range owners {
|
||||
ownerNode, ok := p.uidToNode[owner.UID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
delete(ownerNode.dependents, n)
|
||||
}
|
||||
}
|
||||
|
||||
// removeNode removes the node from p.uidToNode, then finds all
|
||||
// owners as listed in n.owners, and removes n from their dependents list.
|
||||
func (p *Propagator) removeNode(n *node) {
|
||||
delete(p.uidToNode, n.identity.UID)
|
||||
p.removeDependentFromOwners(n, n.owners)
|
||||
}
|
||||
|
||||
// TODO: profile this function to see if a naive N^2 algorithm performs better
|
||||
// when the number of references is small.
|
||||
func referencesDiffs(old []metatypes.OwnerReference, new []metatypes.OwnerReference) (added []metatypes.OwnerReference, removed []metatypes.OwnerReference) {
|
||||
oldUIDToRef := make(map[string]metatypes.OwnerReference)
|
||||
for i := 0; i < len(old); i++ {
|
||||
oldUIDToRef[string(old[i].UID)] = old[i]
|
||||
}
|
||||
oldUIDSet := sets.StringKeySet(oldUIDToRef)
|
||||
newUIDToRef := make(map[string]metatypes.OwnerReference)
|
||||
for i := 0; i < len(new); i++ {
|
||||
newUIDToRef[string(new[i].UID)] = new[i]
|
||||
}
|
||||
newUIDSet := sets.StringKeySet(newUIDToRef)
|
||||
|
||||
addedUID := newUIDSet.Difference(oldUIDSet)
|
||||
removedUID := oldUIDSet.Difference(newUIDSet)
|
||||
|
||||
for uid := range addedUID {
|
||||
added = append(added, newUIDToRef[uid])
|
||||
}
|
||||
for uid := range removedUID {
|
||||
removed = append(removed, oldUIDToRef[uid])
|
||||
}
|
||||
return added, removed
|
||||
}
|
||||
|
||||
// Dequeueing an event from eventQueue, updating graph, populating dirty_queue.
|
||||
func (p *Propagator) processEvent() {
|
||||
key, quit := p.eventQueue.Get()
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
defer p.eventQueue.Done(key)
|
||||
event, ok := key.(event)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("expect an event, got %v", key))
|
||||
return
|
||||
}
|
||||
obj := event.obj
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
|
||||
return
|
||||
}
|
||||
typeAccessor, err := meta.TypeAccessor(obj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
|
||||
return
|
||||
}
|
||||
glog.V(6).Infof("Propagator process object: %s/%s, namespace %s, name %s, event type %s", typeAccessor.GetAPIVersion(), typeAccessor.GetKind(), accessor.GetNamespace(), accessor.GetName(), event.eventType)
|
||||
// Check if the node already exsits
|
||||
existingNode, found := p.uidToNode[accessor.GetUID()]
|
||||
switch {
|
||||
case (event.eventType == addEvent || event.eventType == updateEvent) && !found:
|
||||
newNode := &node{
|
||||
identity: objectReference{
|
||||
OwnerReference: metatypes.OwnerReference{
|
||||
APIVersion: typeAccessor.GetAPIVersion(),
|
||||
Kind: typeAccessor.GetKind(),
|
||||
UID: accessor.GetUID(),
|
||||
Name: accessor.GetName(),
|
||||
},
|
||||
Namespace: accessor.GetNamespace(),
|
||||
},
|
||||
dependents: make(map[*node]struct{}),
|
||||
owners: accessor.GetOwnerReferences(),
|
||||
}
|
||||
p.insertNode(newNode)
|
||||
case (event.eventType == addEvent || event.eventType == updateEvent) && found:
|
||||
// TODO: finalizer: Check if ObjectMeta.DeletionTimestamp is updated from nil to non-nil
|
||||
// We only need to add/remove owner refs for now
|
||||
added, removed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
|
||||
if len(added) == 0 && len(removed) == 0 {
|
||||
glog.V(6).Infof("The updateEvent %#v doesn't change node references, ignore", event)
|
||||
return
|
||||
}
|
||||
// update the node itself
|
||||
existingNode.owners = accessor.GetOwnerReferences()
|
||||
// Add the node to its new owners' dependent lists.
|
||||
p.addDependentToOwners(existingNode, added)
|
||||
// remove the node from the dependent list of node that are no long in
|
||||
// the node's owners list.
|
||||
p.removeDependentFromOwners(existingNode, removed)
|
||||
case event.eventType == deleteEvent:
|
||||
if !found {
|
||||
glog.V(6).Infof("%v doesn't exist in the graph, this shouldn't happen", accessor.GetUID())
|
||||
return
|
||||
}
|
||||
p.removeNode(existingNode)
|
||||
for dep := range existingNode.dependents {
|
||||
p.gc.dirtyQueue.Add(dep)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GarbageCollector is responsible for carrying out cascading deletion, and
|
||||
// removing ownerReferences from the dependents if the owner is deleted with
|
||||
// DeleteOptions.OrphanDependents=true.
|
||||
type GarbageCollector struct {
|
||||
restMapper meta.RESTMapper
|
||||
clientPool dynamic.ClientPool
|
||||
dirtyQueue *workqueue.Type
|
||||
monitors []monitor
|
||||
propagator *Propagator
|
||||
}
|
||||
|
||||
func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversioned.GroupVersionResource) (monitor, error) {
|
||||
// TODO: consider store in one storage.
|
||||
glog.V(6).Infof("create storage for resource %s", resource)
|
||||
var monitor monitor
|
||||
client, err := clientPool.ClientForGroupVersion(resource.GroupVersion())
|
||||
if err != nil {
|
||||
return monitor, err
|
||||
}
|
||||
monitor.store, monitor.controller = framework.NewInformer(
|
||||
// TODO: make special List and Watch function that removes fields other
|
||||
// than ObjectMeta.
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
// APIResource.Kind is not used by the dynamic client, so
|
||||
// leave it empty. We want to list this resource in all
|
||||
// namespaces if it's namespace scoped, so leave
|
||||
// APIResource.Namespaced as false is all right.
|
||||
apiResource := unversioned.APIResource{Name: resource.Resource}
|
||||
return client.Resource(&apiResource, api.NamespaceAll).List(&options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
// APIResource.Kind is not used by the dynamic client, so
|
||||
// leave it empty. We want to list this resource in all
|
||||
// namespaces if it's namespace scoped, so leave
|
||||
// APIResource.Namespaced as false is all right.
|
||||
apiResource := unversioned.APIResource{Name: resource.Resource}
|
||||
return client.Resource(&apiResource, api.NamespaceAll).Watch(&options)
|
||||
},
|
||||
},
|
||||
nil,
|
||||
ResourceResyncTime,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
// add the event to the propagator's eventQueue.
|
||||
AddFunc: func(obj interface{}) {
|
||||
event := event{
|
||||
eventType: addEvent,
|
||||
obj: obj,
|
||||
}
|
||||
p.eventQueue.Add(event)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
event := event{updateEvent, newObj, oldObj}
|
||||
p.eventQueue.Add(event)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
|
||||
if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||||
obj = deletedFinalStateUnknown.Obj
|
||||
}
|
||||
event := event{
|
||||
eventType: deleteEvent,
|
||||
obj: obj,
|
||||
}
|
||||
p.eventQueue.Add(event)
|
||||
},
|
||||
},
|
||||
)
|
||||
return monitor, nil
|
||||
}
|
||||
|
||||
var ignoredResources = map[unversioned.GroupVersionResource]struct{}{
|
||||
unversioned.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicationcontrollers"}: {},
|
||||
unversioned.GroupVersionResource{Group: "", Version: "v1", Resource: "bindings"}: {},
|
||||
unversioned.GroupVersionResource{Group: "", Version: "v1", Resource: "componentstatuses"}: {},
|
||||
unversioned.GroupVersionResource{Group: "", Version: "v1", Resource: "events"}: {},
|
||||
}
|
||||
|
||||
func NewGarbageCollector(clientPool dynamic.ClientPool, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) {
|
||||
gc := &GarbageCollector{
|
||||
clientPool: clientPool,
|
||||
dirtyQueue: workqueue.New(),
|
||||
// TODO: should use a dynamic RESTMapper built from the discovery results.
|
||||
restMapper: registered.RESTMapper(),
|
||||
}
|
||||
gc.propagator = &Propagator{
|
||||
eventQueue: workqueue.New(),
|
||||
uidToNode: make(map[types.UID]*node),
|
||||
gc: gc,
|
||||
}
|
||||
for _, resource := range resources {
|
||||
if _, ok := ignoredResources[resource]; ok {
|
||||
glog.V(6).Infof("ignore resource %#v", resource)
|
||||
continue
|
||||
}
|
||||
monitor, err := monitorFor(gc.propagator, gc.clientPool, resource)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
gc.monitors = append(gc.monitors, monitor)
|
||||
}
|
||||
return gc, nil
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) worker() {
|
||||
key, quit := gc.dirtyQueue.Get()
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
defer gc.dirtyQueue.Done(key)
|
||||
err := gc.processItem(key.(*node))
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Error syncing item %v: %v", key, err))
|
||||
}
|
||||
}
|
||||
|
||||
// apiResource consults the REST mapper to translate an <apiVersion, kind,
|
||||
// namespace> tuple to a unversioned.APIResource struct.
|
||||
func (gc *GarbageCollector) apiResource(apiVersion, kind string, namespaced bool) (*unversioned.APIResource, error) {
|
||||
fqKind := unversioned.FromAPIVersionAndKind(apiVersion, kind)
|
||||
mapping, err := gc.restMapper.RESTMapping(fqKind.GroupKind(), apiVersion)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to get REST mapping for kind: %s, version: %s", kind, apiVersion)
|
||||
}
|
||||
glog.V(6).Infof("map kind %s, version %s to resource %s", kind, apiVersion, mapping.Resource)
|
||||
resource := unversioned.APIResource{
|
||||
Name: mapping.Resource,
|
||||
Namespaced: namespaced,
|
||||
Kind: kind,
|
||||
}
|
||||
return &resource, nil
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) deleteObject(item objectReference) error {
|
||||
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
|
||||
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
|
||||
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
uid := item.UID
|
||||
preconditions := v1.Preconditions{UID: &uid}
|
||||
deleteOptions := v1.DeleteOptions{Preconditions: &preconditions}
|
||||
return client.Resource(resource, item.Namespace).Delete(item.Name, &deleteOptions)
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructured, error) {
|
||||
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
|
||||
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
|
||||
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return client.Resource(resource, item.Namespace).Get(item.Name)
|
||||
}
|
||||
|
||||
func objectReferenceToUnstructured(ref objectReference) *runtime.Unstructured {
|
||||
ret := &runtime.Unstructured{}
|
||||
ret.SetKind(ref.Kind)
|
||||
ret.SetAPIVersion(ref.APIVersion)
|
||||
ret.SetUID(ref.UID)
|
||||
ret.SetNamespace(ref.Namespace)
|
||||
ret.SetName(ref.Name)
|
||||
return ret
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) processItem(item *node) error {
|
||||
// Get the latest item from the API server
|
||||
latest, err := gc.getObject(item.identity)
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
// the Propagator can add "virtual" node for an owner that doesn't
|
||||
// exist yet, so we need to enqueue a virtual Delete event to remove
|
||||
// the virtual node from Propagator.uidToNode.
|
||||
glog.V(6).Infof("item %v not found, generating a virtual delete event", item.identity)
|
||||
event := event{
|
||||
eventType: deleteEvent,
|
||||
obj: objectReferenceToUnstructured(item.identity),
|
||||
}
|
||||
gc.propagator.eventQueue.Add(event)
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
if latest.GetUID() != item.identity.UID {
|
||||
glog.V(6).Infof("UID doesn't match, item %v not found, ignore it", item.identity)
|
||||
return nil
|
||||
}
|
||||
ownerReferences := latest.GetOwnerReferences()
|
||||
if len(ownerReferences) == 0 {
|
||||
glog.V(6).Infof("object %s's doesn't have an owner, continue on next item", item.identity)
|
||||
return nil
|
||||
}
|
||||
// TODO: we need to remove dangling references if the object is not to be
|
||||
// deleted.
|
||||
for _, reference := range ownerReferences {
|
||||
// TODO: we need to verify the reference resource is supported by the
|
||||
// system. If it's not a valid resource, the garbage collector should i)
|
||||
// ignore the reference when decide if the object should be deleted, and
|
||||
// ii) should update the object to remove such references. This is to
|
||||
// prevent objects having references to an old resource from being
|
||||
// deleted during a cluster upgrade.
|
||||
fqKind := unversioned.FromAPIVersionAndKind(reference.APIVersion, reference.Kind)
|
||||
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resource, err := gc.apiResource(reference.APIVersion, reference.Kind, len(item.identity.Namespace) != 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
owner, err := client.Resource(resource, item.identity.Namespace).Get(reference.Name)
|
||||
if err == nil {
|
||||
if owner.GetUID() != reference.UID {
|
||||
glog.V(6).Infof("object %s's owner %s/%s, %s is not found, UID mismatch", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
|
||||
continue
|
||||
}
|
||||
glog.V(6).Infof("object %s has at least an existing owner, will not garbage collect", item.identity.UID)
|
||||
return nil
|
||||
} else if errors.IsNotFound(err) {
|
||||
glog.V(6).Infof("object %s's owner %s/%s, %s is not found", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
glog.V(2).Infof("none of object %s's owners exist any more, will garbage collect it", item.identity)
|
||||
return gc.deleteObject(item.identity)
|
||||
}
|
||||
|
||||
func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
|
||||
for _, monitor := range gc.monitors {
|
||||
go monitor.controller.Run(stopCh)
|
||||
}
|
||||
|
||||
// worker
|
||||
go wait.Until(gc.propagator.processEvent, 0, stopCh)
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(gc.worker, 0, stopCh)
|
||||
}
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down garbage collector")
|
||||
gc.dirtyQueue.ShutDown()
|
||||
gc.propagator.eventQueue.ShutDown()
|
||||
}
|
||||
|
||||
// QueueDrained returns if the dirtyQueue and eventQueue are drained. It's
|
||||
// useful for debugging.
|
||||
func (gc *GarbageCollector) QueuesDrained() bool {
|
||||
return gc.dirtyQueue.Len() == 0 && gc.propagator.eventQueue.Len() == 0
|
||||
}
|
||||
|
||||
// *FOR TEST USE ONLY* It's not safe to call this function when the GC is still
|
||||
// busy.
|
||||
// GraphHasUID returns if the Propagator has a particular UID store in its
|
||||
// uidToNode graph. It's useful for debugging.
|
||||
func (gc *GarbageCollector) GraphHasUID(UIDs []types.UID) bool {
|
||||
for _, u := range UIDs {
|
||||
if _, ok := gc.propagator.uidToNode[u]; ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
|
@ -0,0 +1,287 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package garbagecollector
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
_ "k8s.io/kubernetes/pkg/api/install"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/meta/metatypes"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/client/typed/dynamic"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/json"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||
)
|
||||
|
||||
func TestNewGarbageCollector(t *testing.T) {
|
||||
clientPool := dynamic.NewClientPool(&restclient.Config{}, dynamic.LegacyAPIPathResolverFunc)
|
||||
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
|
||||
gc, err := NewGarbageCollector(clientPool, podResource)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assert.Equal(t, 1, len(gc.monitors))
|
||||
}
|
||||
|
||||
// fakeAction records information about requests to aid in testing.
|
||||
type fakeAction struct {
|
||||
method string
|
||||
path string
|
||||
}
|
||||
|
||||
// String returns method=path to aid in testing
|
||||
func (f *fakeAction) String() string {
|
||||
return strings.Join([]string{f.method, f.path}, "=")
|
||||
}
|
||||
|
||||
type FakeResponse struct {
|
||||
statusCode int
|
||||
content []byte
|
||||
}
|
||||
|
||||
// fakeActionHandler holds a list of fakeActions received
|
||||
type fakeActionHandler struct {
|
||||
// statusCode and content returned by this handler for different method + path.
|
||||
response map[string]FakeResponse
|
||||
|
||||
lock sync.Mutex
|
||||
actions []fakeAction
|
||||
}
|
||||
|
||||
// ServeHTTP logs the action that occurred and always returns the associated status code
|
||||
func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path})
|
||||
fakeResponse, ok := f.response[request.Method+request.URL.Path]
|
||||
if !ok {
|
||||
fakeResponse.statusCode = 200
|
||||
fakeResponse.content = []byte("{\"kind\": \"List\"}")
|
||||
}
|
||||
response.WriteHeader(fakeResponse.statusCode)
|
||||
response.Write(fakeResponse.content)
|
||||
}
|
||||
|
||||
// testServerAndClientConfig returns a server that listens and a config that can reference it
|
||||
func testServerAndClientConfig(handler func(http.ResponseWriter, *http.Request)) (*httptest.Server, *restclient.Config) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(handler))
|
||||
config := &restclient.Config{
|
||||
Host: srv.URL,
|
||||
}
|
||||
return srv, config
|
||||
}
|
||||
|
||||
func newDanglingPod() *v1.Pod {
|
||||
return &v1.Pod{
|
||||
TypeMeta: unversioned.TypeMeta{
|
||||
Kind: "Pod",
|
||||
APIVersion: "v1",
|
||||
},
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: "ToBeDeletedPod",
|
||||
Namespace: "ns1",
|
||||
OwnerReferences: []v1.OwnerReference{
|
||||
{
|
||||
Kind: "ReplicationController",
|
||||
Name: "owner1",
|
||||
UID: "123",
|
||||
APIVersion: "v1",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// test the processItem function making the expected actions.
|
||||
func TestProcessItem(t *testing.T) {
|
||||
pod := newDanglingPod()
|
||||
podBytes, err := json.Marshal(pod)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
testHandler := &fakeActionHandler{
|
||||
response: map[string]FakeResponse{
|
||||
"GET" + "/api/v1/namespaces/ns1/replicationcontrollers/owner1": {
|
||||
404,
|
||||
[]byte{},
|
||||
},
|
||||
"GET" + "/api/v1/namespaces/ns1/pods/ToBeDeletedPod": {
|
||||
200,
|
||||
podBytes,
|
||||
},
|
||||
},
|
||||
}
|
||||
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
|
||||
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
|
||||
defer srv.Close()
|
||||
clientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc)
|
||||
gc, err := NewGarbageCollector(clientPool, podResource)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
item := &node{
|
||||
identity: objectReference{
|
||||
OwnerReference: metatypes.OwnerReference{
|
||||
Kind: pod.Kind,
|
||||
APIVersion: pod.APIVersion,
|
||||
Name: pod.Name,
|
||||
UID: pod.UID,
|
||||
},
|
||||
Namespace: pod.Namespace,
|
||||
},
|
||||
// owners are intentionally left empty. The processItem routine should get the latest item from the server.
|
||||
owners: nil,
|
||||
}
|
||||
err = gc.processItem(item)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected Error: %v", err)
|
||||
}
|
||||
expectedActionSet := sets.NewString()
|
||||
expectedActionSet.Insert("GET=/api/v1/namespaces/ns1/replicationcontrollers/owner1")
|
||||
expectedActionSet.Insert("DELETE=/api/v1/namespaces/ns1/pods/ToBeDeletedPod")
|
||||
expectedActionSet.Insert("GET=/api/v1/namespaces/ns1/pods/ToBeDeletedPod")
|
||||
|
||||
actualActionSet := sets.NewString()
|
||||
for _, action := range testHandler.actions {
|
||||
actualActionSet.Insert(action.String())
|
||||
}
|
||||
if !expectedActionSet.Equal(actualActionSet) {
|
||||
t.Errorf("expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet,
|
||||
actualActionSet, expectedActionSet.Difference(actualActionSet))
|
||||
}
|
||||
}
|
||||
|
||||
// verifyGraphInvariants verifies that all of a node's owners list the node as a
|
||||
// dependent and vice versa. uidToNode has all the nodes in the graph.
|
||||
func verifyGraphInvariants(scenario string, uidToNode map[types.UID]*node, t *testing.T) {
|
||||
for myUID, node := range uidToNode {
|
||||
for dependentNode := range node.dependents {
|
||||
found := false
|
||||
for _, owner := range dependentNode.owners {
|
||||
if owner.UID == myUID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("scenario: %s: node %s has node %s as a dependent, but it's not present in the latter node's owners list", scenario, node.identity, dependentNode.identity)
|
||||
}
|
||||
}
|
||||
|
||||
for _, owner := range node.owners {
|
||||
ownerNode, ok := uidToNode[owner.UID]
|
||||
if !ok {
|
||||
// It's possible that the owner node doesn't exist
|
||||
continue
|
||||
}
|
||||
if _, ok := ownerNode.dependents[node]; !ok {
|
||||
t.Errorf("node %s has node %s as an owner, but it's not present in the latter node's dependents list", node.identity, ownerNode.identity)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createEvent(eventType eventType, selfUID string, owners []string) event {
|
||||
var ownerReferences []api.OwnerReference
|
||||
for i := 0; i < len(owners); i++ {
|
||||
ownerReferences = append(ownerReferences, api.OwnerReference{UID: types.UID(owners[i])})
|
||||
}
|
||||
return event{
|
||||
eventType: eventType,
|
||||
obj: &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: types.UID(selfUID),
|
||||
OwnerReferences: ownerReferences,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessEvent(t *testing.T) {
|
||||
var testScenarios = []struct {
|
||||
name string
|
||||
// a series of events that will be supplied to the
|
||||
// Propagator.eventQueue.
|
||||
events []event
|
||||
}{
|
||||
{
|
||||
name: "test1",
|
||||
events: []event{
|
||||
createEvent(addEvent, "1", []string{}),
|
||||
createEvent(addEvent, "2", []string{"1"}),
|
||||
createEvent(addEvent, "3", []string{"1", "2"}),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test2",
|
||||
events: []event{
|
||||
createEvent(addEvent, "1", []string{}),
|
||||
createEvent(addEvent, "2", []string{"1"}),
|
||||
createEvent(addEvent, "3", []string{"1", "2"}),
|
||||
createEvent(addEvent, "4", []string{"2"}),
|
||||
createEvent(deleteEvent, "2", []string{"doesn't matter"}),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test3",
|
||||
events: []event{
|
||||
createEvent(addEvent, "1", []string{}),
|
||||
createEvent(addEvent, "2", []string{"1"}),
|
||||
createEvent(addEvent, "3", []string{"1", "2"}),
|
||||
createEvent(addEvent, "4", []string{"3"}),
|
||||
createEvent(updateEvent, "2", []string{"4"}),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reverse test2",
|
||||
events: []event{
|
||||
createEvent(addEvent, "4", []string{"2"}),
|
||||
createEvent(addEvent, "3", []string{"1", "2"}),
|
||||
createEvent(addEvent, "2", []string{"1"}),
|
||||
createEvent(addEvent, "1", []string{}),
|
||||
createEvent(deleteEvent, "2", []string{"doesn't matter"}),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, scenario := range testScenarios {
|
||||
propagator := &Propagator{
|
||||
eventQueue: workqueue.New(),
|
||||
uidToNode: make(map[types.UID]*node),
|
||||
gc: &GarbageCollector{
|
||||
dirtyQueue: workqueue.New(),
|
||||
},
|
||||
}
|
||||
for i := 0; i < len(scenario.events); i++ {
|
||||
propagator.eventQueue.Add(scenario.events[i])
|
||||
propagator.processEvent()
|
||||
verifyGraphInvariants(scenario.name, propagator.uidToNode, t)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,7 +18,6 @@ package namespace
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
|
@ -26,10 +25,8 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/client/typed/discovery"
|
||||
"k8s.io/kubernetes/pkg/client/typed/dynamic"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
@ -452,33 +449,3 @@ func estimateGracefulTerminationForPods(kubeClient clientset.Interface, ns strin
|
|||
}
|
||||
return estimate, nil
|
||||
}
|
||||
|
||||
// ServerPreferredNamespacedGroupVersionResources uses the specified client to discover the set of preferred groupVersionResources that are namespaced
|
||||
func ServerPreferredNamespacedGroupVersionResources(discoveryClient discovery.DiscoveryInterface) ([]unversioned.GroupVersionResource, error) {
|
||||
results := []unversioned.GroupVersionResource{}
|
||||
serverGroupList, err := discoveryClient.ServerGroups()
|
||||
if err != nil {
|
||||
return results, err
|
||||
}
|
||||
|
||||
allErrs := []error{}
|
||||
for _, apiGroup := range serverGroupList.Groups {
|
||||
preferredVersion := apiGroup.PreferredVersion
|
||||
apiResourceList, err := discoveryClient.ServerResourcesForGroupVersion(preferredVersion.GroupVersion)
|
||||
if err != nil {
|
||||
allErrs = append(allErrs, err)
|
||||
continue
|
||||
}
|
||||
groupVersion := unversioned.GroupVersion{Group: apiGroup.Name, Version: preferredVersion.Version}
|
||||
for _, apiResource := range apiResourceList.APIResources {
|
||||
if !apiResource.Namespaced {
|
||||
continue
|
||||
}
|
||||
if strings.Contains(apiResource.Name, "/") {
|
||||
continue
|
||||
}
|
||||
results = append(results, groupVersion.WithResource(apiResource.Name))
|
||||
}
|
||||
}
|
||||
return results, utilerrors.NewAggregate(allErrs)
|
||||
}
|
||||
|
|
|
@ -68,6 +68,47 @@ func UnsafeObjectConvertor(scheme *Scheme) ObjectConvertor {
|
|||
return unsafeObjectConvertor{scheme}
|
||||
}
|
||||
|
||||
// SetField puts the value of src, into fieldName, which must be a member of v.
|
||||
// The value of src must be assignable to the field.
|
||||
func SetField(src interface{}, v reflect.Value, fieldName string) error {
|
||||
field := v.FieldByName(fieldName)
|
||||
if !field.IsValid() {
|
||||
return fmt.Errorf("couldn't find %v field in %#v", fieldName, v.Interface())
|
||||
}
|
||||
srcValue := reflect.ValueOf(src)
|
||||
if srcValue.Type().AssignableTo(field.Type()) {
|
||||
field.Set(srcValue)
|
||||
return nil
|
||||
}
|
||||
if srcValue.Type().ConvertibleTo(field.Type()) {
|
||||
field.Set(srcValue.Convert(field.Type()))
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("couldn't assign/convert %v to %v", srcValue.Type(), field.Type())
|
||||
}
|
||||
|
||||
// Field puts the value of fieldName, which must be a member of v, into dest,
|
||||
// which must be a variable to which this field's value can be assigned.
|
||||
func Field(v reflect.Value, fieldName string, dest interface{}) error {
|
||||
field := v.FieldByName(fieldName)
|
||||
if !field.IsValid() {
|
||||
return fmt.Errorf("couldn't find %v field in %#v", fieldName, v.Interface())
|
||||
}
|
||||
destValue, err := conversion.EnforcePtr(dest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if field.Type().AssignableTo(destValue.Type()) {
|
||||
destValue.Set(field)
|
||||
return nil
|
||||
}
|
||||
if field.Type().ConvertibleTo(destValue.Type()) {
|
||||
destValue.Set(field.Convert(destValue.Type()))
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("couldn't assign/convert %v to %v", field.Type(), destValue.Type())
|
||||
}
|
||||
|
||||
// fieldPtr puts the address of fieldName, which must be a member of v,
|
||||
// into dest, which must be an address of a variable to which this field's
|
||||
// address can be assigned.
|
||||
|
|
|
@ -17,6 +17,11 @@ limitations under the License.
|
|||
package runtime
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/meta/metatypes"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
)
|
||||
|
@ -196,6 +201,71 @@ func (u *Unstructured) setNestedMap(value map[string]string, fields ...string) {
|
|||
setNestedMap(u.Object, value, fields...)
|
||||
}
|
||||
|
||||
func extractOwnerReference(src interface{}) metatypes.OwnerReference {
|
||||
v := src.(map[string]interface{})
|
||||
return metatypes.OwnerReference{
|
||||
Kind: getNestedString(v, "kind"),
|
||||
Name: getNestedString(v, "name"),
|
||||
APIVersion: getNestedString(v, "apiVersion"),
|
||||
UID: (types.UID)(getNestedString(v, "uid")),
|
||||
}
|
||||
}
|
||||
|
||||
func setOwnerReference(src metatypes.OwnerReference) map[string]interface{} {
|
||||
ret := make(map[string]interface{})
|
||||
setNestedField(ret, src.Kind, "kind")
|
||||
setNestedField(ret, src.Name, "name")
|
||||
setNestedField(ret, src.APIVersion, "apiVersion")
|
||||
setNestedField(ret, string(src.UID), "uid")
|
||||
return ret
|
||||
}
|
||||
|
||||
func getOwnerReferences(object map[string]interface{}) ([]map[string]interface{}, error) {
|
||||
field := getNestedField(object, "metadata", "ownerReferences")
|
||||
if field == nil {
|
||||
return nil, fmt.Errorf("cannot find field metadata.ownerReferences in %v", object)
|
||||
}
|
||||
ownerReferences, ok := field.([]map[string]interface{})
|
||||
if ok {
|
||||
return ownerReferences, nil
|
||||
}
|
||||
// TODO: This is hacky...
|
||||
interfaces, ok := field.([]interface{})
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect metadata.ownerReferences to be a slice in %#v", object)
|
||||
}
|
||||
ownerReferences = make([]map[string]interface{}, 0, len(interfaces))
|
||||
for i := 0; i < len(interfaces); i++ {
|
||||
r, ok := interfaces[i].(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect element metadata.ownerReferences to be a map[string]interface{} in %#v", object)
|
||||
}
|
||||
ownerReferences = append(ownerReferences, r)
|
||||
}
|
||||
return ownerReferences, nil
|
||||
}
|
||||
|
||||
func (u *Unstructured) GetOwnerReferences() []metatypes.OwnerReference {
|
||||
original, err := getOwnerReferences(u.Object)
|
||||
if err != nil {
|
||||
glog.V(6).Info(err)
|
||||
return nil
|
||||
}
|
||||
ret := make([]metatypes.OwnerReference, 0, len(original))
|
||||
for i := 0; i < len(original); i++ {
|
||||
ret = append(ret, extractOwnerReference(original[i]))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (u *Unstructured) SetOwnerReferences(references []metatypes.OwnerReference) {
|
||||
var newReferences = make([]map[string]interface{}, 0, len(references))
|
||||
for i := 0; i < len(references); i++ {
|
||||
newReferences = append(newReferences, setOwnerReference(references[i]))
|
||||
}
|
||||
u.setNestedField(newReferences, "metadata", "ownerReferences")
|
||||
}
|
||||
|
||||
func (u *Unstructured) GetAPIVersion() string {
|
||||
return getNestedString(u.Object, "apiVersion")
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package runtime
|
|||
import (
|
||||
gojson "encoding/json"
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/util/json"
|
||||
|
@ -128,6 +129,12 @@ func (s unstructuredJSONScheme) decodeToList(data []byte, list *UnstructuredList
|
|||
return err
|
||||
}
|
||||
|
||||
// For typed lists, e.g., a PodList, API server doesn't set each item's
|
||||
// APIVersion and Kind. We need to set it.
|
||||
listAPIVersion := list.GetAPIVersion()
|
||||
listKind := list.GetKind()
|
||||
itemKind := strings.TrimSuffix(listKind, "List")
|
||||
|
||||
delete(list.Object, "items")
|
||||
list.Items = nil
|
||||
for _, i := range dList.Items {
|
||||
|
@ -135,6 +142,12 @@ func (s unstructuredJSONScheme) decodeToList(data []byte, list *UnstructuredList
|
|||
if err := s.decodeToUnstructured([]byte(i), unstruct); err != nil {
|
||||
return err
|
||||
}
|
||||
// This is hacky. Set the item's Kind and APIVersion to those inferred
|
||||
// from the List.
|
||||
if len(unstruct.GetKind()) == 0 && len(unstruct.GetAPIVersion()) == 0 {
|
||||
unstruct.SetKind(itemKind)
|
||||
unstruct.SetAPIVersion(listAPIVersion)
|
||||
}
|
||||
list.Items = append(list.Items, unstruct)
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/meta/metatypes"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/validation"
|
||||
|
@ -141,6 +142,20 @@ func TestUnstructuredGetters(t *testing.T) {
|
|||
"annotations": map[string]interface{}{
|
||||
"test_annotation": "test_value",
|
||||
},
|
||||
"ownerReferences": []map[string]interface{}{
|
||||
{
|
||||
"kind": "Pod",
|
||||
"name": "poda",
|
||||
"apiVersion": "v1",
|
||||
"uid": "1",
|
||||
},
|
||||
{
|
||||
"kind": "Pod",
|
||||
"name": "podb",
|
||||
"apiVersion": "v1",
|
||||
"uid": "2",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -192,6 +207,24 @@ func TestUnstructuredGetters(t *testing.T) {
|
|||
if got, want := unstruct.GetAnnotations(), map[string]string{"test_annotation": "test_value"}; !reflect.DeepEqual(got, want) {
|
||||
t.Errorf("GetAnnotations() = %s, want %s", got, want)
|
||||
}
|
||||
refs := unstruct.GetOwnerReferences()
|
||||
expectedOwnerReferences := []metatypes.OwnerReference{
|
||||
{
|
||||
Kind: "Pod",
|
||||
Name: "poda",
|
||||
APIVersion: "v1",
|
||||
UID: "1",
|
||||
},
|
||||
{
|
||||
Kind: "Pod",
|
||||
Name: "podb",
|
||||
APIVersion: "v1",
|
||||
UID: "2",
|
||||
},
|
||||
}
|
||||
if got, want := refs, expectedOwnerReferences; !reflect.DeepEqual(got, want) {
|
||||
t.Errorf("GetOwnerReference()=%v, want %v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnstructuredSetters(t *testing.T) {
|
||||
|
@ -216,6 +249,20 @@ func TestUnstructuredSetters(t *testing.T) {
|
|||
"annotations": map[string]interface{}{
|
||||
"test_annotation": "test_value",
|
||||
},
|
||||
"ownerReferences": []map[string]interface{}{
|
||||
{
|
||||
"kind": "Pod",
|
||||
"name": "poda",
|
||||
"apiVersion": "v1",
|
||||
"uid": "1",
|
||||
},
|
||||
{
|
||||
"kind": "Pod",
|
||||
"name": "podb",
|
||||
"apiVersion": "v1",
|
||||
"uid": "2",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -233,9 +280,24 @@ func TestUnstructuredSetters(t *testing.T) {
|
|||
unstruct.SetDeletionTimestamp(&date)
|
||||
unstruct.SetLabels(map[string]string{"test_label": "test_value"})
|
||||
unstruct.SetAnnotations(map[string]string{"test_annotation": "test_value"})
|
||||
newOwnerReferences := []metatypes.OwnerReference{
|
||||
{
|
||||
Kind: "Pod",
|
||||
Name: "poda",
|
||||
APIVersion: "v1",
|
||||
UID: "1",
|
||||
},
|
||||
{
|
||||
Kind: "Pod",
|
||||
Name: "podb",
|
||||
APIVersion: "v1",
|
||||
UID: "2",
|
||||
},
|
||||
}
|
||||
unstruct.SetOwnerReferences(newOwnerReferences)
|
||||
|
||||
if !reflect.DeepEqual(unstruct, want) {
|
||||
t.Errorf("Wanted: \n%s\n Got:\n%s", unstruct, want)
|
||||
t.Errorf("Wanted: \n%s\n Got:\n%s", want, unstruct)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,374 @@
|
|||
// +build integration,!no-etcd
|
||||
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/client/typed/dynamic"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector"
|
||||
"k8s.io/kubernetes/pkg/master"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
||||
const garbageCollectedPodName = "test.pod.1"
|
||||
const independentPodName = "test.pod.2"
|
||||
const oneValidOwnerPodName = "test.pod.3"
|
||||
const toBeDeletedRCName = "test.rc.1"
|
||||
const remainingRCName = "test.rc.2"
|
||||
|
||||
func newPod(podName string, ownerReferences []v1.OwnerReference) *v1.Pod {
|
||||
for i := 0; i < len(ownerReferences); i++ {
|
||||
ownerReferences[i].Kind = "ReplicationController"
|
||||
ownerReferences[i].APIVersion = "v1"
|
||||
}
|
||||
return &v1.Pod{
|
||||
TypeMeta: unversioned.TypeMeta{
|
||||
Kind: "Pod",
|
||||
APIVersion: "v1",
|
||||
},
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: podName,
|
||||
Namespace: framework.TestNS,
|
||||
OwnerReferences: ownerReferences,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "fake-name",
|
||||
Image: "fakeimage",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newOwnerRC(name string) *v1.ReplicationController {
|
||||
return &v1.ReplicationController{
|
||||
TypeMeta: unversioned.TypeMeta{
|
||||
Kind: "ReplicationController",
|
||||
APIVersion: "v1",
|
||||
},
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Namespace: framework.TestNS,
|
||||
Name: name,
|
||||
},
|
||||
Spec: v1.ReplicationControllerSpec{
|
||||
Selector: map[string]string{"name": "test"},
|
||||
Template: &v1.PodTemplateSpec{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Labels: map[string]string{"name": "test"},
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "fake-name",
|
||||
Image: "fakeimage",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func observePodDeletion(t *testing.T, w watch.Interface) (deletedPod *api.Pod) {
|
||||
deleted := false
|
||||
timeout := false
|
||||
timer := time.After(60 * time.Second)
|
||||
for !deleted && !timeout {
|
||||
select {
|
||||
case event, _ := <-w.ResultChan():
|
||||
if event.Type == watch.Deleted {
|
||||
// TODO: used the commented code once we fix the client.
|
||||
// deletedPod = event.Object.(*v1.Pod)
|
||||
deletedPod = event.Object.(*api.Pod)
|
||||
deleted = true
|
||||
}
|
||||
case <-timer:
|
||||
timeout = true
|
||||
}
|
||||
}
|
||||
if !deleted {
|
||||
t.Fatalf("Failed to observe pod deletion")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func setup(t *testing.T) (*garbagecollector.GarbageCollector, clientset.Interface) {
|
||||
var m *master.Master
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
m.Handler.ServeHTTP(w, req)
|
||||
}))
|
||||
// TODO: close the http server
|
||||
|
||||
masterConfig := framework.NewIntegrationTestMasterConfig()
|
||||
masterConfig.EnableCoreControllers = false
|
||||
m, err := master.New(masterConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Error in bringing up the master: %v", err)
|
||||
}
|
||||
|
||||
framework.DeleteAllEtcdKeys()
|
||||
clientSet, err := clientset.NewForConfig(&restclient.Config{Host: s.URL})
|
||||
if err != nil {
|
||||
t.Fatalf("Error in create clientset: %v", err)
|
||||
}
|
||||
groupVersionResources, err := clientSet.Discovery().ServerPreferredResources()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get supported resources from server: %v", err)
|
||||
}
|
||||
clientPool := dynamic.NewClientPool(&restclient.Config{Host: s.URL}, dynamic.LegacyAPIPathResolverFunc)
|
||||
gc, err := garbagecollector.NewGarbageCollector(clientPool, groupVersionResources)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create garbage collector")
|
||||
}
|
||||
return gc, clientSet
|
||||
}
|
||||
|
||||
// This test simulates the cascading deletion.
|
||||
func TestCascadingDeletion(t *testing.T) {
|
||||
gc, clientSet := setup(t)
|
||||
rcClient := clientSet.Core().ReplicationControllers(framework.TestNS)
|
||||
podClient := clientSet.Core().Pods(framework.TestNS)
|
||||
|
||||
toBeDeletedRC, err := rcClient.Create(newOwnerRC(toBeDeletedRCName))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create replication controller: %v", err)
|
||||
}
|
||||
remainingRC, err := rcClient.Create(newOwnerRC(remainingRCName))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create replication controller: %v", err)
|
||||
}
|
||||
|
||||
rcs, err := rcClient.List(api.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to list replication controllers: %v", err)
|
||||
}
|
||||
if len(rcs.Items) != 2 {
|
||||
t.Fatalf("Expect only 2 replication controller")
|
||||
}
|
||||
|
||||
// this pod should be cascadingly deleted.
|
||||
pod := newPod(garbageCollectedPodName, []v1.OwnerReference{{UID: toBeDeletedRC.ObjectMeta.UID, Name: toBeDeletedRCName}})
|
||||
_, err = podClient.Create(pod)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Pod: %v", err)
|
||||
}
|
||||
|
||||
// this pod shouldn't be cascadingly deleted, because it has a valid referenece.
|
||||
pod = newPod(oneValidOwnerPodName, []v1.OwnerReference{
|
||||
{UID: toBeDeletedRC.ObjectMeta.UID, Name: toBeDeletedRCName},
|
||||
{UID: remainingRC.ObjectMeta.UID, Name: remainingRCName},
|
||||
})
|
||||
_, err = podClient.Create(pod)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Pod: %v", err)
|
||||
}
|
||||
|
||||
// this pod shouldn't be cascadingly deleted, because it doesn't have an owner.
|
||||
pod = newPod(independentPodName, []v1.OwnerReference{})
|
||||
_, err = podClient.Create(pod)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Pod: %v", err)
|
||||
}
|
||||
|
||||
// set up watch
|
||||
pods, err := podClient.List(api.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to list pods: %v", err)
|
||||
}
|
||||
if len(pods.Items) != 3 {
|
||||
t.Fatalf("Expect only 3 pods")
|
||||
}
|
||||
options := api.ListOptions{
|
||||
ResourceVersion: pods.ListMeta.ResourceVersion,
|
||||
}
|
||||
w, err := podClient.Watch(options)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to set up watch: %v", err)
|
||||
}
|
||||
stopCh := make(chan struct{})
|
||||
go gc.Run(5, stopCh)
|
||||
defer close(stopCh)
|
||||
// delete one of the replication controller
|
||||
if err := rcClient.Delete(toBeDeletedRCName, nil); err != nil {
|
||||
t.Fatalf("failed to delete replication controller: %v", err)
|
||||
}
|
||||
|
||||
deletedPod := observePodDeletion(t, w)
|
||||
if deletedPod == nil {
|
||||
t.Fatalf("empty deletedPod")
|
||||
}
|
||||
if deletedPod.Name != garbageCollectedPodName {
|
||||
t.Fatalf("deleted unexpected pod: %v", *deletedPod)
|
||||
}
|
||||
// wait for another 30 seconds to give garbage collect a chance to make mistakes.
|
||||
time.Sleep(30 * time.Second)
|
||||
// checks the garbage collect doesn't delete pods it shouldn't do.
|
||||
if _, err := podClient.Get(independentPodName); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := podClient.Get(oneValidOwnerPodName); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// This test simulates the case where an object is created with an owner that
|
||||
// doesn't exist. It verifies the GC will delete such an object.
|
||||
func TestCreateWithNonExisitentOwner(t *testing.T) {
|
||||
gc, clientSet := setup(t)
|
||||
podClient := clientSet.Core().Pods(framework.TestNS)
|
||||
|
||||
pod := newPod(garbageCollectedPodName, []v1.OwnerReference{{UID: "doesn't matter", Name: toBeDeletedRCName}})
|
||||
_, err := podClient.Create(pod)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Pod: %v", err)
|
||||
}
|
||||
|
||||
// set up watch
|
||||
pods, err := podClient.List(api.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to list pods: %v", err)
|
||||
}
|
||||
if len(pods.Items) != 1 {
|
||||
t.Fatalf("Expect only 1 pod")
|
||||
}
|
||||
options := api.ListOptions{
|
||||
ResourceVersion: pods.ListMeta.ResourceVersion,
|
||||
}
|
||||
w, err := podClient.Watch(options)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to set up watch: %v", err)
|
||||
}
|
||||
stopCh := make(chan struct{})
|
||||
go gc.Run(5, stopCh)
|
||||
defer close(stopCh)
|
||||
deletedPod := observePodDeletion(t, w)
|
||||
if deletedPod == nil {
|
||||
t.Fatalf("empty deletedPod")
|
||||
}
|
||||
if deletedPod.Name != garbageCollectedPodName {
|
||||
t.Fatalf("deleted unexpected pod: %v", *deletedPod)
|
||||
}
|
||||
}
|
||||
|
||||
func createRemoveRCsPods(t *testing.T, clientSet clientset.Interface, id int, wg *sync.WaitGroup, rcUIDs chan types.UID) {
|
||||
defer wg.Done()
|
||||
rcClient := clientSet.Core().ReplicationControllers(framework.TestNS)
|
||||
podClient := clientSet.Core().Pods(framework.TestNS)
|
||||
// create rc.
|
||||
rcName := toBeDeletedRCName + strconv.Itoa(id)
|
||||
toBeDeletedRC, err := rcClient.Create(newOwnerRC(rcName))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create replication controller: %v", err)
|
||||
}
|
||||
rcUIDs <- toBeDeletedRC.ObjectMeta.UID
|
||||
// create pods. These pods should be cascadingly deleted.
|
||||
for j := 0; j < 3; j++ {
|
||||
podName := garbageCollectedPodName + strconv.Itoa(id) + "-" + strconv.Itoa(j)
|
||||
pod := newPod(podName, []v1.OwnerReference{{UID: toBeDeletedRC.ObjectMeta.UID, Name: rcName}})
|
||||
_, err = podClient.Create(pod)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Pod: %v", err)
|
||||
}
|
||||
}
|
||||
// delete the rc
|
||||
if err := rcClient.Delete(rcName, nil); err != nil {
|
||||
t.Fatalf("failed to delete replication controller: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func allObjectsRemoved(clientSet clientset.Interface) (bool, error) {
|
||||
rcClient := clientSet.Core().ReplicationControllers(framework.TestNS)
|
||||
podClient := clientSet.Core().Pods(framework.TestNS)
|
||||
pods, err := podClient.List(api.ListOptions{})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Failed to list pods: %v", err)
|
||||
}
|
||||
if len(pods.Items) != 0 {
|
||||
return false, nil
|
||||
}
|
||||
rcs, err := rcClient.List(api.ListOptions{})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Failed to list replication controllers: %v", err)
|
||||
}
|
||||
if len(rcs.Items) != 0 {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// This stress test the garbage collector
|
||||
func TestStressingCascadingDeletion(t *testing.T) {
|
||||
t.Logf("starts garbage collector stress test")
|
||||
gc, clientSet := setup(t)
|
||||
stopCh := make(chan struct{})
|
||||
go gc.Run(5, stopCh)
|
||||
defer close(stopCh)
|
||||
|
||||
const collections = 50
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(collections)
|
||||
rcUIDs := make(chan types.UID, collections)
|
||||
for i := 0; i < collections; i++ {
|
||||
go createRemoveRCsPods(t, clientSet, i, &wg, rcUIDs)
|
||||
}
|
||||
wg.Wait()
|
||||
t.Logf("all pods are created, all replications controllers are created then deleted")
|
||||
// wait for the garbage collector to drain its queue
|
||||
if err := wait.Poll(10*time.Second, 300*time.Second, func() (bool, error) {
|
||||
return gc.QueuesDrained(), nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Logf("garbage collector queues drained")
|
||||
// wait for all replication controllers and pods to be deleted. This
|
||||
// shouldn't take long, because the queues are already drained.
|
||||
if err := wait.Poll(5*time.Second, 30*time.Second, func() (bool, error) {
|
||||
return allObjectsRemoved(clientSet)
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Logf("all replication controllers and pods are deleted")
|
||||
|
||||
// verify there is no node representing replication controllers in the gc's graph
|
||||
uids := make([]types.UID, 0, collections)
|
||||
for i := 0; i < collections; i++ {
|
||||
uid := <-rcUIDs
|
||||
uids = append(uids, uid)
|
||||
}
|
||||
if gc.GraphHasUID(uids) {
|
||||
t.Errorf("Expect all nodes representing replication controllers are removed from the Propagator's graph")
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue