Merge pull request #46677 from enisoc/tpr-migrate-etcd

Automatic merge from submit-queue (batch tested with PRs 43505, 45168, 46439, 46677, 46623)

Add TPR to CRD migration helper.

This is a helper for migrating TPR data to CustomResource. It's rather hacky because it requires crossing apiserver boundaries, but doing it this way keeps the mess contained to the TPR code, which is scheduled for deletion anyway.

It's also not completely hands-free because making it resilient enough to be completely automated is too involved to be worth it for an alpha-to-beta migration, and would require investing significant effort to fix up soon-to-be-deleted TPR code. Instead, this feature will be documented as a best-effort helper whose results should be verified by hand.

The intended benefit of this over a totally manual process is that it should be possible to copy TPR data into a CRD without having to tear everything down in the middle. The process would look like this:

1. Upgrade to k8s 1.7. Nothing happens to your TPRs.
1. Create CRD with group/version and resource names that match the TPR. Still nothing happens to your TPRs, as the CRD is hidden by the overlapping TPR.
1. Delete the TPR. The TPR data is converted to CustomResource data, and the CRD begins serving at the same REST path.

Note that the old TPR data is left behind by this process, so watchers should not receive DELETE events. This also means the user can revert to the pre-migration state by recreating the TPR definition.

Ref. https://github.com/kubernetes/kubernetes/issues/45728
pull/6/head
Kubernetes Submit Queue 2017-06-01 05:43:44 -07:00 committed by GitHub
commit 98e5496aa2
13 changed files with 313 additions and 26 deletions

View File

@ -82,6 +82,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",

View File

@ -47,6 +47,7 @@ import (
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authorization/authorizer"
genericregistry "k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/filters"
serverstorage "k8s.io/apiserver/pkg/server/storage"
@ -117,7 +118,7 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
return err
}
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, sharedInformers)
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, sharedInformers, apiExtensionsConfig.CRDRESTOptionsGetter)
if err != nil {
return err
}
@ -161,8 +162,8 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
}
// CreateKubeAPIServer creates and wires a workable kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory, crdRESTOptionsGetter genericregistry.RESTOptionsGetter) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer, crdRESTOptionsGetter)
if err != nil {
return nil, err
}

View File

@ -181,6 +181,7 @@ function kubectl-with-retry()
# wait-for-pods-with-label "app=foo" "nginx-0nginx-1"
function wait-for-pods-with-label()
{
local i
for i in $(seq 1 10); do
kubeout=`kubectl get po -l $1 --template '{{range.items}}{{.metadata.name}}{{end}}' --sort-by metadata.name "${kube_flags[@]}"`
if [[ $kubeout = $2 ]]; then
@ -1415,6 +1416,101 @@ __EOF__
kubectl delete thirdpartyresources/bar.company.com "${kube_flags[@]}"
}
run_tpr_migration_tests() {
local i tries
create_and_use_new_namespace
# Create CRD first. This is sort of backwards so we can create a marker below.
kubectl "${kube_flags_with_token[@]}" create -f - << __EOF__
{
"kind": "CustomResourceDefinition",
"apiVersion": "apiextensions.k8s.io/v1beta1",
"metadata": {
"name": "foos.company.crd"
},
"spec": {
"group": "company.crd",
"version": "v1",
"names": {
"plural": "foos",
"kind": "Foo"
}
}
}
__EOF__
# Wait for API to become available.
tries=0
until kubectl "${kube_flags[@]}" get foos.company.crd || [ $tries -gt 10 ]; do
tries=$((tries+1))
sleep ${tries}
done
kube::test::get_object_assert foos.company.crd '{{len .items}}' '0'
# Create a marker that only exists in CRD so we know when CRD is active vs. TPR.
kubectl "${kube_flags[@]}" create -f - << __EOF__
{
"kind": "Foo",
"apiVersion": "company.crd/v1",
"metadata": {
"name": "crd-marker"
},
"testValue": "only exists in CRD"
}
__EOF__
kube::test::get_object_assert foos.company.crd '{{len .items}}' '1'
# Now create a TPR that sits in front of the CRD and hides it.
kubectl "${kube_flags[@]}" create -f - << __EOF__
{
"kind": "ThirdPartyResource",
"apiVersion": "extensions/v1beta1",
"metadata": {
"name": "foo.company.crd"
},
"versions": [
{
"name": "v1"
}
]
}
__EOF__
# The marker should disappear.
kube::test::wait_object_assert foos.company.crd '{{len .items}}' '0'
# Add some items to the TPR.
for i in {1..10}; do
kubectl "${kube_flags[@]}" create -f - << __EOF__
{
"kind": "Foo",
"apiVersion": "company.crd/v1",
"metadata": {
"name": "tpr-${i}"
},
"testValue": "migrate-${i}"
}
__EOF__
done
kube::test::get_object_assert foos.company.crd '{{len .items}}' '10'
# Delete the TPR and wait for the CRD to take over.
kubectl "${kube_flags[@]}" delete thirdpartyresource/foo.company.crd
tries=0
until kubectl "${kube_flags[@]}" get foos.company.crd/crd-marker || [ $tries -gt 10 ]; do
tries=$((tries+1))
sleep ${tries}
done
kube::test::get_object_assert foos.company.crd/crd-marker '{{.testValue}}' 'only exists in CRD'
# Check if the TPR items were migrated to CRD.
kube::test::get_object_assert foos.company.crd '{{len .items}}' '11'
for i in {1..10}; do
kube::test::get_object_assert foos.company.crd/tpr-${i} '{{.testValue}}' "migrate-${i}"
done
# teardown
kubectl delete customresourcedefinitions/foos.company.crd "${kube_flags_with_token[@]}"
}
kube::util::non_native_resources() {
local times
@ -2953,12 +3049,12 @@ runTests() {
kube::log::status "Checking kubectl version"
kubectl version
i=0
ns_num=0
create_and_use_new_namespace() {
i=$(($i+1))
kube::log::status "Creating namespace namespace${i}"
kubectl create namespace "namespace${i}"
kubectl config set-context "${CONTEXT}" --namespace="namespace${i}"
ns_num=$(($ns_num+1))
kube::log::status "Creating namespace namespace${ns_num}"
kubectl create namespace "namespace${ns_num}"
kubectl config set-context "${CONTEXT}" --namespace="namespace${ns_num}"
}
kube_flags=(
@ -3292,6 +3388,9 @@ runTests() {
if kube::test::if_supports_resource "${thirdpartyresources}" ; then
run_tpr_tests
if kube::test::if_supports_resource "${customresourcedefinitions}" ; then
run_tpr_migration_tests
fi
fi
#################

View File

@ -28,6 +28,7 @@ import (
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/endpoints/discovery"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
serverstorage "k8s.io/apiserver/pkg/server/storage"
@ -210,7 +211,7 @@ func (c *Config) SkipComplete() completedConfig {
// Certain config fields will be set to a default value if unset.
// Certain config fields must be specified, including:
// KubeletClientConfig
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget, crdRESTOptionsGetter genericregistry.RESTOptionsGetter) (*Master, error) {
if reflect.DeepEqual(c.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
}
@ -254,7 +255,8 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
extensionsrest.RESTStorageProvider{ResourceInterface: thirdparty.NewThirdPartyResourceServer(s, s.DiscoveryGroupManager, c.StorageFactory)},
// TODO(enisoc): Remove crdRESTOptionsGetter input argument when TPR code is removed.
extensionsrest.RESTStorageProvider{ResourceInterface: thirdparty.NewThirdPartyResourceServer(s, s.DiscoveryGroupManager, c.StorageFactory, crdRESTOptionsGetter)},
networkingrest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer},

View File

@ -54,7 +54,7 @@ func TestValidOpenAPISpec(t *testing.T) {
}
config.GenericConfig.SwaggerConfig = genericapiserver.DefaultSwaggerConfig()
master, err := config.Complete().New(genericapiserver.EmptyDelegate)
master, err := config.Complete().New(genericapiserver.EmptyDelegate, nil)
if err != nil {
t.Fatalf("Error in bringing up the master: %v", err)
}

View File

@ -115,7 +115,7 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertion
func newMaster(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
etcdserver, config, assert := setUp(t)
master, err := config.Complete().New(genericapiserver.EmptyDelegate)
master, err := config.Complete().New(genericapiserver.EmptyDelegate, nil)
if err != nil {
t.Fatalf("Error in bringing up the master: %v", err)
}
@ -141,7 +141,7 @@ func limitedAPIResourceConfigSource() *serverstorage.ResourceConfig {
func newLimitedMaster(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
etcdserver, config, assert := setUp(t)
config.APIResourceConfigSource = limitedAPIResourceConfigSource()
master, err := config.Complete().New(genericapiserver.EmptyDelegate)
master, err := config.Complete().New(genericapiserver.EmptyDelegate, nil)
if err != nil {
t.Fatalf("Error in bringing up the master: %v", err)
}

View File

@ -27,9 +27,12 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints:go_default_library",
@ -40,12 +43,16 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//vendor/k8s.io/client-go/discovery:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/apis/apiextensions:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/apiserver:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/client/clientset/internalclientset/typed/apiextensions/internalversion:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/client/informers/internalversion/apiextensions/internalversion:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/client/listers/apiextensions/internalversion:go_default_library",
"//vendor/k8s.io/kube-apiextensions-server/pkg/registry/customresource:go_default_library",
],
)

View File

@ -25,16 +25,26 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/json"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
genericapi "k8s.io/apiserver/pkg/endpoints"
"k8s.io/apiserver/pkg/endpoints/discovery"
"k8s.io/apiserver/pkg/endpoints/request"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorgage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
discoveryclient "k8s.io/client-go/discovery"
"k8s.io/kube-apiextensions-server/pkg/apis/apiextensions"
apiextensionsserver "k8s.io/kube-apiextensions-server/pkg/apiserver"
apiextensionsclient "k8s.io/kube-apiextensions-server/pkg/client/clientset/internalclientset/typed/apiextensions/internalversion"
"k8s.io/kube-apiextensions-server/pkg/registry/customresource"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
extensionsrest "k8s.io/kubernetes/pkg/registry/extensions/rest"
@ -71,13 +81,16 @@ type ThirdPartyResourceServer struct {
// Useful for reliable testing. Shouldn't be used otherwise.
disableThirdPartyControllerForTesting bool
crdRESTOptionsGetter generic.RESTOptionsGetter
}
func NewThirdPartyResourceServer(genericAPIServer *genericapiserver.GenericAPIServer, availableGroupManager discovery.GroupManager, storageFactory serverstorgage.StorageFactory) *ThirdPartyResourceServer {
func NewThirdPartyResourceServer(genericAPIServer *genericapiserver.GenericAPIServer, availableGroupManager discovery.GroupManager, storageFactory serverstorgage.StorageFactory, crdRESTOptionsGetter generic.RESTOptionsGetter) *ThirdPartyResourceServer {
ret := &ThirdPartyResourceServer{
genericAPIServer: genericAPIServer,
thirdPartyResources: map[string]*thirdPartyEntry{},
availableGroupManager: availableGroupManager,
crdRESTOptionsGetter: crdRESTOptionsGetter,
}
var err error
@ -130,7 +143,7 @@ func (m *ThirdPartyResourceServer) removeThirdPartyStorage(path, resource string
if !found {
return nil
}
if err := m.removeAllThirdPartyResources(storage); err != nil {
if err := m.removeThirdPartyResourceData(&entry.group, resource, storage); err != nil {
return err
}
delete(entry.storage, resource)
@ -166,25 +179,132 @@ func (m *ThirdPartyResourceServer) RemoveThirdPartyResource(path string) error {
return nil
}
func (m *ThirdPartyResourceServer) removeAllThirdPartyResources(registry *thirdpartyresourcedatastore.REST) error {
ctx := genericapirequest.NewDefaultContext()
func (m *ThirdPartyResourceServer) removeThirdPartyResourceData(group *metav1.APIGroup, resource string, registry *thirdpartyresourcedatastore.REST) error {
// Freeze TPR data to prevent new writes via this apiserver process.
// Other apiservers can still write. This is best-effort because there
// are worse problems with TPR data than the possibility of going back
// in time when migrating to CRD [citation needed].
registry.Freeze()
ctx := genericapirequest.NewContext()
existingData, err := registry.List(ctx, nil)
if err != nil {
return err
}
list, ok := existingData.(*extensions.ThirdPartyResourceDataList)
if !ok {
return fmt.Errorf("expected a *ThirdPartyResourceDataList, got %#v", list)
return fmt.Errorf("expected a *ThirdPartyResourceDataList, got %T", existingData)
}
for ix := range list.Items {
item := &list.Items[ix]
if _, _, err := registry.Delete(ctx, item.Name, nil); err != nil {
// Migrate TPR data to CRD if requested.
gvk := schema.GroupVersionKind{Group: group.Name, Version: group.PreferredVersion.Version, Kind: registry.Kind()}
migrationRequested, err := m.migrateThirdPartyResourceData(gvk, resource, list)
if err != nil {
// Migration is best-effort. Log and continue.
utilruntime.HandleError(fmt.Errorf("failed to migrate TPR data: %v", err))
}
// Skip deletion of TPR data if migration was requested (whether or not it succeeded).
// This leaves the etcd data around for rollback, and to avoid sending DELETE watch events.
if migrationRequested {
return nil
}
for i := range list.Items {
item := &list.Items[i]
// Use registry.Store.Delete() to bypass the frozen registry.Delete().
if _, _, err := registry.Store.Delete(genericapirequest.WithNamespace(ctx, item.Namespace), item.Name, nil); err != nil {
return err
}
}
return nil
}
func (m *ThirdPartyResourceServer) findMatchingCRD(gvk schema.GroupVersionKind, resource string) (*apiextensions.CustomResourceDefinition, error) {
// CustomResourceDefinitionList does not implement the protobuf marshalling interface.
config := *m.genericAPIServer.LoopbackClientConfig
config.ContentType = "application/json"
crdClient, err := apiextensionsclient.NewForConfig(&config)
if err != nil {
return nil, fmt.Errorf("can't create apiextensions client: %v", err)
}
crdList, err := crdClient.CustomResourceDefinitions().List(metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("can't list CustomResourceDefinitions: %v", err)
}
for i := range crdList.Items {
item := &crdList.Items[i]
if item.Spec.Scope == apiextensions.NamespaceScoped &&
item.Spec.Group == gvk.Group && item.Spec.Version == gvk.Version &&
item.Status.AcceptedNames.Kind == gvk.Kind && item.Status.AcceptedNames.Plural == resource {
return item, nil
}
}
return nil, nil
}
func (m *ThirdPartyResourceServer) migrateThirdPartyResourceData(gvk schema.GroupVersionKind, resource string, dataList *extensions.ThirdPartyResourceDataList) (bool, error) {
// A matching CustomResourceDefinition implies migration is requested.
crd, err := m.findMatchingCRD(gvk, resource)
if err != nil {
return false, fmt.Errorf("can't determine if TPR should migrate: %v", err)
}
if crd == nil {
// No migration requested.
return false, nil
}
// Talk directly to CustomResource storage.
// We have to bypass the API server because TPR is shadowing CRD at this point.
storage := customresource.NewREST(
schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural},
schema.GroupVersionKind{Group: crd.Spec.Group, Version: crd.Spec.Version, Kind: crd.Spec.Names.ListKind},
apiextensionsserver.UnstructuredCopier{},
customresource.NewStrategy(discoveryclient.NewUnstructuredObjectTyper(nil), true),
m.crdRESTOptionsGetter,
)
// Copy TPR data to CustomResource.
var errs []error
ctx := request.NewContext()
for i := range dataList.Items {
item := &dataList.Items[i]
// Convert TPR data to Unstructured.
objMap := make(map[string]interface{})
if err := json.Unmarshal(item.Data, &objMap); err != nil {
errs = append(errs, fmt.Errorf("can't unmarshal TPR data %q: %v", item.Name, err))
continue
}
// Convert metadata to Unstructured and merge with data.
// cf. thirdpartyresourcedata.encodeToJSON()
metaMap := make(map[string]interface{})
buf, err := json.Marshal(&item.ObjectMeta)
if err != nil {
errs = append(errs, fmt.Errorf("can't marshal metadata for TPR data %q: %v", item.Name, err))
continue
}
if err := json.Unmarshal(buf, &metaMap); err != nil {
errs = append(errs, fmt.Errorf("can't unmarshal TPR data %q: %v", item.Name, err))
continue
}
// resourceVersion cannot be set when creating objects.
delete(metaMap, "resourceVersion")
objMap["metadata"] = metaMap
// Store CustomResource.
obj := &unstructured.Unstructured{Object: objMap}
createCtx := request.WithNamespace(ctx, obj.GetNamespace())
if _, err := storage.Create(createCtx, obj); err != nil {
errs = append(errs, fmt.Errorf("can't create CustomResource for TPR data %q: %v", item.Name, err))
continue
}
}
return true, utilerrors.NewAggregate(errs)
}
// ListThirdPartyResources lists all currently installed third party resources
// The format is <path>/<resource-plural-name>
func (m *ThirdPartyResourceServer) ListThirdPartyResources() []string {

View File

@ -35,9 +35,14 @@ go_library(
"//pkg/apis/extensions:go_default_library",
"//pkg/registry/cachesize:go_default_library",
"//pkg/registry/extensions/thirdpartyresourcedata:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
],
)

View File

@ -18,20 +18,72 @@ package storage
import (
"strings"
"sync/atomic"
"k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/registry/cachesize"
"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata"
)
// REST implements a RESTStorage for ThirdPartyResourceData
// errFrozen is a transient error to indicate that clients should retry with backoff.
var errFrozen = errors.NewServiceUnavailable("TPR data is temporarily frozen")
// REST implements a RESTStorage for ThirdPartyResourceData.
type REST struct {
*genericregistry.Store
kind string
kind string
frozen atomic.Value
}
// Freeze causes all future calls to Create/Update/Delete/DeleteCollection to return a transient error.
// This is irreversible and meant for use when the TPR data is being deleted or migrated/abandoned.
func (r *REST) Freeze() {
r.frozen.Store(true)
}
func (r *REST) isFrozen() bool {
return r.frozen.Load() != nil
}
// Create is a wrapper to support Freeze.
func (r *REST) Create(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) {
if r.isFrozen() {
return nil, errFrozen
}
return r.Store.Create(ctx, obj)
}
// Update is a wrapper to support Freeze.
func (r *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo) (runtime.Object, bool, error) {
if r.isFrozen() {
return nil, false, errFrozen
}
return r.Store.Update(ctx, name, objInfo)
}
// Delete is a wrapper to support Freeze.
func (r *REST) Delete(ctx genericapirequest.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
if r.isFrozen() {
return nil, false, errFrozen
}
return r.Store.Delete(ctx, name, options)
}
// DeleteCollection is a wrapper to support Freeze.
func (r *REST) DeleteCollection(ctx genericapirequest.Context, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
if r.isFrozen() {
return nil, errFrozen
}
return r.Store.DeleteCollection(ctx, options, listOptions)
}
// NewREST returns a registry which will store ThirdPartyResourceData in the given helper

View File

@ -607,7 +607,7 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV
kubeAPIServerConfig.APIResourceConfigSource = &allResourceSource{} // force enable all resources
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.EmptyDelegate, sharedInformers)
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.EmptyDelegate, sharedInformers, nil)
if err != nil {
t.Fatal(err)
}

View File

@ -118,7 +118,7 @@ func TestAggregatedAPIServer(t *testing.T) {
}
kubeClientConfigValue.Store(kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.EmptyDelegate, sharedInformers)
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.EmptyDelegate, sharedInformers, nil)
if err != nil {
t.Fatal(err)
}

View File

@ -253,7 +253,7 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv
}
masterConfig.GenericConfig.SharedInformerFactory = extinformers.NewSharedInformerFactory(clientset, masterConfig.GenericConfig.LoopbackClientConfig.Timeout)
m, err = masterConfig.Complete().New(genericapiserver.EmptyDelegate)
m, err = masterConfig.Complete().New(genericapiserver.EmptyDelegate, nil)
if err != nil {
closeFn()
glog.Fatalf("error in bringing up the master: %v", err)